aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-04-04 18:11:53 +0000
committerChristian Grothoff <christian@grothoff.org>2011-04-04 18:11:53 +0000
commit7351d04517a2a1ad48880d4fa46e780068929d6e (patch)
treeaa9bc044fad2f7747ccb57cabb7bf3ffcbfd1ae1 /src/fs/gnunet-service-fs.c
parent2663c23e2b0944dfb015332f10ff65cfc802588a (diff)
downloadgnunet-7351d04517a2a1ad48880d4fa46e780068929d6e.tar.gz
gnunet-7351d04517a2a1ad48880d4fa46e780068929d6e.zip
move
Diffstat (limited to 'src/fs/gnunet-service-fs.c')
-rw-r--r--src/fs/gnunet-service-fs.c4726
1 files changed, 244 insertions, 4482 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 35d89c50f..20a98e6f2 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -22,6 +22,11 @@
22 * @file fs/gnunet-service-fs.c 22 * @file fs/gnunet-service-fs.c
23 * @brief gnunet anonymity protocol implementation 23 * @brief gnunet anonymity protocol implementation
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 *
26 * To use:
27 * - consider re-issue GSF_dht_lookup_ after non-DHT reply received
28 * - implement 'SUPPORT_DELAYS'
29 *
25 */ 30 */
26#include "platform.h" 31#include "platform.h"
27#include <float.h> 32#include <float.h>
@@ -36,30 +41,15 @@
36#include "gnunet_statistics_service.h" 41#include "gnunet_statistics_service.h"
37#include "gnunet_transport_service.h" 42#include "gnunet_transport_service.h"
38#include "gnunet_util_lib.h" 43#include "gnunet_util_lib.h"
44#include "gnunet-service-fs_cp.h"
39#include "gnunet-service-fs_indexing.h" 45#include "gnunet-service-fs_indexing.h"
46#include "gnunet-service-fs_lc.h"
47#include "gnunet-service-fs_pe.h"
48#include "gnunet-service-fs_pr.h"
49#include "gnunet-service-fs_push.h"
50#include "gnunet-service-fs_put.h"
40#include "fs.h" 51#include "fs.h"
41 52
42#define DEBUG_FS GNUNET_YES
43
44/**
45 * Should we introduce random latency in processing? Required for proper
46 * implementation of GAP, but can be disabled for performance evaluation of
47 * the basic routing algorithm.
48 *
49 * Note that with delays enabled, performance can be significantly lower
50 * (several orders of magnitude in 2-peer test runs); if you want to
51 * measure throughput of other components, set this to NO. Also, you
52 * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
53 * a rather wasteful mode of operation (that might still get the highest
54 * throughput overall).
55 *
56 * Performance measurements (for 50 MB file, 2 peers):
57 *
58 * - Without delays: 3300 kb/s
59 * - With delays: 101 kb/s
60 */
61#define SUPPORT_DELAYS GNUNET_NO
62
63/** 53/**
64 * Size for the hash map for DHT requests from the FS 54 * Size for the hash map for DHT requests from the FS
65 * service. Should be about the number of concurrent 55 * service. Should be about the number of concurrent
@@ -67,17 +57,6 @@
67 */ 57 */
68#define FS_DHT_HT_SIZE 1024 58#define FS_DHT_HT_SIZE 1024
69 59
70/**
71 * At what frequency should our datastore load decrease
72 * automatically (since if we don't use it, clearly the
73 * load must be going down).
74 */
75#define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
76
77/**
78 * How often do we flush trust values to disk?
79 */
80#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
81 60
82/** 61/**
83 * How quickly do we age cover traffic? At the given 62 * How quickly do we age cover traffic? At the given
@@ -86,830 +65,33 @@
86 */ 65 */
87#define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) 66#define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
88 67
89/**
90 * How often do we at most PUT content into the DHT?
91 */
92#define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
93
94/**
95 * How long must content remain valid for us to consider it for migration?
96 * If content will expire too soon, there is clearly no point in pushing
97 * it to other peers. This value gives the threshold for migration. Note
98 * that if this value is increased, the migration testcase may need to be
99 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
100 */
101#define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
102
103/**
104 * Inverse of the probability that we will submit the same query
105 * to the same peer again. If the same peer already got the query
106 * repeatedly recently, the probability is multiplied by the inverse
107 * of this number each time. Note that we only try about every TTL_DECREMENT/2
108 * plus MAX_CORK_DELAY (so roughly every 3.5s).
109 *
110 * Note that this factor is a key influence to performance in small
111 * networks (especially test networks of 2 peers) because if there is
112 * only a single peer with the data, this value will determine how
113 * soon we might re-try. For example, a value of 3 can result in
114 * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
115 * give us 5 MB/s. OTOH, obviously re-trying the same peer can be
116 * rather inefficient in larger networks, hence picking 1 is in
117 * general not the best choice.
118 *
119 * Performance measurements (for 50 MB file, 2 peers, no delays):
120 *
121 * - 1: 3300 kb/s (consistently)
122 * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
123 * - 5: 759 kb/s, 968 kb/s, 1160 kb/s
124 *
125 * Note that this does NOT mean that the value should be 1 since
126 * a 2-peer network is far from representative here (and this fails
127 * to take into consideration bandwidth wasted by repeatedly
128 * sending queries to peers that don't have the content). Also,
129 * it is expected that higher values lead to more inconsistent
130 * measurements since this only affects lost messages towards the
131 * end of the download.
132 *
133 * Finally, we should probably consider changing this and making
134 * it dependent on the number of connected peers or a related
135 * metric (bad magic constants...).
136 */
137#define RETRY_PROBABILITY_INV 1
138
139/**
140 * What is the maximum delay for a P2P FS message (in our interaction
141 * with core)? FS-internal delays are another story. The value is
142 * chosen based on the 32k block size. Given that peers typcially
143 * have at least 1 kb/s bandwidth, 45s waits give us a chance to
144 * transmit one message even to the lowest-bandwidth peers.
145 */
146#define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
147
148/**
149 * Maximum number of requests (from other peers, overall) that we're
150 * willing to have pending at any given point in time. Can be changed
151 * via the configuration file (32k is just the default).
152 */
153static unsigned long long max_pending_requests = (32 * 1024);
154
155/**
156 * Information we keep for each pending reply. The
157 * actual message follows at the end of this struct.
158 */
159struct PendingMessage;
160
161/**
162 * Function called upon completion of a transmission.
163 *
164 * @param cls closure
165 * @param pid ID of receiving peer, 0 on transmission error
166 */
167typedef void (*TransmissionContinuation)(void * cls,
168 GNUNET_PEER_Id tpid);
169
170
171/**
172 * Information we keep for each pending message (GET/PUT). The
173 * actual message follows at the end of this struct.
174 */
175struct PendingMessage
176{
177 /**
178 * This is a doubly-linked list of messages to the same peer.
179 */
180 struct PendingMessage *next;
181
182 /**
183 * This is a doubly-linked list of messages to the same peer.
184 */
185 struct PendingMessage *prev;
186
187 /**
188 * Entry in pending message list for this pending message.
189 */
190 struct PendingMessageList *pml;
191
192 /**
193 * Function to call immediately once we have transmitted this
194 * message.
195 */
196 TransmissionContinuation cont;
197
198 /**
199 * Closure for cont.
200 */
201 void *cont_cls;
202
203 /**
204 * Do not transmit this pending message until this deadline.
205 */
206 struct GNUNET_TIME_Absolute delay_until;
207
208 /**
209 * Size of the reply; actual reply message follows
210 * at the end of this struct.
211 */
212 size_t msize;
213
214 /**
215 * How important is this message for us?
216 */
217 uint32_t priority;
218
219};
220
221
222/**
223 * Information about a peer that we are connected to.
224 * We track data that is useful for determining which
225 * peers should receive our requests. We also keep
226 * a list of messages to transmit to this peer.
227 */
228struct ConnectedPeer
229{
230
231 /**
232 * List of the last clients for which this peer successfully
233 * answered a query.
234 */
235 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
236
237 /**
238 * List of the last PIDs for which
239 * this peer successfully answered a query;
240 * We use 0 to indicate no successful reply.
241 */
242 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
243
244 /**
245 * Average delay between sending the peer a request and
246 * getting a reply (only calculated over the requests for
247 * which we actually got a reply). Calculated
248 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
249 */
250 struct GNUNET_TIME_Relative avg_delay;
251
252 /**
253 * Point in time until which this peer does not want us to migrate content
254 * to it.
255 */
256 struct GNUNET_TIME_Absolute migration_blocked;
257
258 /**
259 * Time until when we blocked this peer from migrating
260 * data to us.
261 */
262 struct GNUNET_TIME_Absolute last_migration_block;
263
264 /**
265 * Transmission times for the last MAX_QUEUE_PER_PEER
266 * requests for this peer. Used as a ring buffer, current
267 * offset is stored in 'last_request_times_off'. If the
268 * oldest entry is more recent than the 'avg_delay', we should
269 * not send any more requests right now.
270 */
271 struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
272
273 /**
274 * Handle for an active request for transmission to this
275 * peer, or NULL.
276 */
277 struct GNUNET_CORE_TransmitHandle *cth;
278
279 /**
280 * Messages (replies, queries, content migration) we would like to
281 * send to this peer in the near future. Sorted by priority, head.
282 */
283 struct PendingMessage *pending_messages_head;
284
285 /**
286 * Messages (replies, queries, content migration) we would like to
287 * send to this peer in the near future. Sorted by priority, tail.
288 */
289 struct PendingMessage *pending_messages_tail;
290
291 /**
292 * How long does it typically take for us to transmit a message
293 * to this peer? (delay between the request being issued and
294 * the callback being invoked).
295 */
296 struct GNUNET_LOAD_Value *transmission_delay;
297
298 /**
299 * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
300 */
301 struct GNUNET_CORE_InformationRequestContext *irc;
302
303 /**
304 * Request for which 'irc' is currently active (or NULL).
305 */
306 struct PendingRequest *pr;
307
308 /**
309 * Time when the last transmission request was issued.
310 */
311 struct GNUNET_TIME_Absolute last_transmission_request_start;
312
313 /**
314 * ID of delay task for scheduling transmission.
315 */
316 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
317
318 /**
319 * Average priority of successful replies. Calculated
320 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
321 */
322 double avg_priority;
323
324 /**
325 * Increase in traffic preference still to be submitted
326 * to the core service for this peer.
327 */
328 uint64_t inc_preference;
329
330 /**
331 * Trust rating for this peer
332 */
333 uint32_t trust;
334
335 /**
336 * Trust rating for this peer on disk.
337 */
338 uint32_t disk_trust;
339
340 /**
341 * The peer's identity.
342 */
343 GNUNET_PEER_Id pid;
344
345 /**
346 * Size of the linked list of 'pending_messages'.
347 */
348 unsigned int pending_requests;
349
350 /**
351 * Which offset in "last_p2p_replies" will be updated next?
352 * (we go round-robin).
353 */
354 unsigned int last_p2p_replies_woff;
355
356 /**
357 * Which offset in "last_client_replies" will be updated next?
358 * (we go round-robin).
359 */
360 unsigned int last_client_replies_woff;
361
362 /**
363 * Current offset into 'last_request_times' ring buffer.
364 */
365 unsigned int last_request_times_off;
366
367};
368
369
370/**
371 * Information we keep for each pending request. We should try to
372 * keep this struct as small as possible since its memory consumption
373 * is key to how many requests we can have pending at once.
374 */
375struct PendingRequest;
376
377
378/**
379 * Doubly-linked list of requests we are performing
380 * on behalf of the same client.
381 */
382struct ClientRequestList
383{
384
385 /**
386 * This is a doubly-linked list.
387 */
388 struct ClientRequestList *next;
389
390 /**
391 * This is a doubly-linked list.
392 */
393 struct ClientRequestList *prev;
394
395 /**
396 * Request this entry represents.
397 */
398 struct PendingRequest *req;
399
400 /**
401 * Client list this request belongs to.
402 */
403 struct ClientList *client_list;
404
405};
406
407
408/**
409 * Replies to be transmitted to the client. The actual
410 * response message is allocated after this struct.
411 */
412struct ClientResponseMessage
413{
414 /**
415 * This is a doubly-linked list.
416 */
417 struct ClientResponseMessage *next;
418
419 /**
420 * This is a doubly-linked list.
421 */
422 struct ClientResponseMessage *prev;
423
424 /**
425 * Client list entry this response belongs to.
426 */
427 struct ClientList *client_list;
428
429 /**
430 * Number of bytes in the response.
431 */
432 size_t msize;
433};
434
435
436/**
437 * Linked list of clients we are performing requests
438 * for right now.
439 */
440struct ClientList
441{
442 /**
443 * This is a linked list.
444 */
445 struct ClientList *next;
446
447 /**
448 * ID of a client.
449 */
450 struct GNUNET_SERVER_Client *client;
451
452 /**
453 * Head of list of requests performed on behalf
454 * of this client right now.
455 */
456 struct ClientRequestList *rl_head;
457
458 /**
459 * Tail of list of requests performed on behalf
460 * of this client right now.
461 */
462 struct ClientRequestList *rl_tail;
463
464 /**
465 * Head of linked list of responses.
466 */
467 struct ClientResponseMessage *res_head;
468
469 /**
470 * Tail of linked list of responses.
471 */
472 struct ClientResponseMessage *res_tail;
473
474 /**
475 * Context for sending replies.
476 */
477 struct GNUNET_CONNECTION_TransmitHandle *th;
478
479};
480
481
482/**
483 * Information about a peer that we have forwarded this
484 * request to already.
485 */
486struct UsedTargetEntry
487{
488 /**
489 * What was the last time we have transmitted this request to this
490 * peer?
491 */
492 struct GNUNET_TIME_Absolute last_request_time;
493
494 /**
495 * How often have we transmitted this request to this peer?
496 */
497 unsigned int num_requests;
498
499 /**
500 * PID of the target peer.
501 */
502 GNUNET_PEER_Id pid;
503
504};
505
506
507/**
508 * Doubly-linked list of messages we are performing
509 * due to a pending request.
510 */
511struct PendingMessageList
512{
513
514 /**
515 * This is a doubly-linked list of messages on behalf of the same request.
516 */
517 struct PendingMessageList *next;
518
519 /**
520 * This is a doubly-linked list of messages on behalf of the same request.
521 */
522 struct PendingMessageList *prev;
523
524 /**
525 * Message this entry represents.
526 */
527 struct PendingMessage *pm;
528
529 /**
530 * Request this entry belongs to.
531 */
532 struct PendingRequest *req;
533
534 /**
535 * Peer this message is targeted for.
536 */
537 struct ConnectedPeer *target;
538
539};
540
541
542/**
543 * Information we keep for each pending request. We should try to
544 * keep this struct as small as possible since its memory consumption
545 * is key to how many requests we can have pending at once.
546 */
547struct PendingRequest
548{
549
550 /**
551 * If this request was made by a client, this is our entry in the
552 * client request list; otherwise NULL.
553 */
554 struct ClientRequestList *client_request_list;
555
556 /**
557 * Entry of peer responsible for this entry (if this request
558 * was made by a peer).
559 */
560 struct ConnectedPeer *cp;
561
562 /**
563 * If this is a namespace query, pointer to the hash of the public
564 * key of the namespace; otherwise NULL. Pointer will be to the
565 * end of this struct (so no need to free it).
566 */
567 const GNUNET_HashCode *namespace;
568
569 /**
570 * Bloomfilter we use to filter out replies that we don't care about
571 * (anymore). NULL as long as we are interested in all replies.
572 */
573 struct GNUNET_CONTAINER_BloomFilter *bf;
574
575 /**
576 * Reference to DHT get operation for this request (or NULL).
577 */
578 struct GNUNET_DHT_GetHandle *dht_get;
579
580 /**
581 * Context of our GNUNET_CORE_peer_change_preference call.
582 */
583 struct ConnectedPeer *pirc;
584
585 /**
586 * Hash code of all replies that we have seen so far (only valid
587 * if client is not NULL since we only track replies like this for
588 * our own clients).
589 */
590 GNUNET_HashCode *replies_seen;
591
592 /**
593 * Node in the heap representing this entry; NULL
594 * if we have no heap node.
595 */
596 struct GNUNET_CONTAINER_HeapNode *hnode;
597
598 /**
599 * Head of list of messages being performed on behalf of this
600 * request.
601 */
602 struct PendingMessageList *pending_head;
603
604 /**
605 * Tail of list of messages being performed on behalf of this
606 * request.
607 */
608 struct PendingMessageList *pending_tail;
609
610 /**
611 * When did we first see this request (form this peer), or, if our
612 * client is initiating, when did we last initiate a search?
613 */
614 struct GNUNET_TIME_Absolute start_time;
615
616 /**
617 * The query that this request is for.
618 */
619 GNUNET_HashCode query;
620
621 /**
622 * The task responsible for transmitting queries
623 * for this request.
624 */
625 GNUNET_SCHEDULER_TaskIdentifier task;
626
627 /**
628 * (Interned) Peer identifier that identifies a preferred target
629 * for requests.
630 */
631 GNUNET_PEER_Id target_pid;
632
633 /**
634 * (Interned) Peer identifiers of peers that have already
635 * received our query for this content.
636 */
637 struct UsedTargetEntry *used_targets;
638
639 /**
640 * Our entry in the queue (non-NULL while we wait for our
641 * turn to interact with the local database).
642 */
643 struct GNUNET_DATASTORE_QueueEntry *qe;
644
645 /**
646 * Size of the 'bf' (in bytes).
647 */
648 size_t bf_size;
649
650 /**
651 * Desired anonymity level; only valid for requests from a local client.
652 */
653 uint32_t anonymity_level;
654
655 /**
656 * How many entries in "used_targets" are actually valid?
657 */
658 unsigned int used_targets_off;
659
660 /**
661 * How long is the "used_targets" array?
662 */
663 unsigned int used_targets_size;
664 68
665 /** 69/* ****************************** globals ****************************** */
666 * Number of results found for this request.
667 */
668 unsigned int results_found;
669
670 /**
671 * How many entries in "replies_seen" are actually valid?
672 */
673 unsigned int replies_seen_off;
674
675 /**
676 * How long is the "replies_seen" array?
677 */
678 unsigned int replies_seen_size;
679
680 /**
681 * Priority with which this request was made. If one of our clients
682 * made the request, then this is the current priority that we are
683 * using when initiating the request. This value is used when
684 * we decide to reward other peers with trust for providing a reply.
685 */
686 uint32_t priority;
687
688 /**
689 * Priority points left for us to spend when forwarding this request
690 * to other peers.
691 */
692 uint32_t remaining_priority;
693
694 /**
695 * Number to mingle hashes for bloom-filter tests with.
696 */
697 int32_t mingle;
698
699 /**
700 * TTL with which we saw this request (or, if we initiated, TTL that
701 * we used for the request).
702 */
703 int32_t ttl;
704
705 /**
706 * Type of the content that this request is for.
707 */
708 enum GNUNET_BLOCK_Type type;
709
710 /**
711 * Remove this request after transmission of the current response.
712 */
713 int8_t do_remove;
714
715 /**
716 * GNUNET_YES if we should not forward this request to other peers.
717 */
718 int8_t local_only;
719
720 /**
721 * GNUNET_YES if we should not forward this request to other peers. (HUH?)
722 */
723 int8_t forward_only;
724
725};
726
727
728/**
729 * Block that is ready for migration to other peers. Actual data is at the end of the block.
730 */
731struct MigrationReadyBlock
732{
733
734 /**
735 * This is a doubly-linked list.
736 */
737 struct MigrationReadyBlock *next;
738
739 /**
740 * This is a doubly-linked list.
741 */
742 struct MigrationReadyBlock *prev;
743
744 /**
745 * Query for the block.
746 */
747 GNUNET_HashCode query;
748
749 /**
750 * When does this block expire?
751 */
752 struct GNUNET_TIME_Absolute expiration;
753
754 /**
755 * Peers we would consider forwarding this
756 * block to. Zero for empty entries.
757 */
758 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
759
760 /**
761 * Size of the block.
762 */
763 size_t size;
764
765 /**
766 * Number of targets already used.
767 */
768 unsigned int used_targets;
769
770 /**
771 * Type of the block.
772 */
773 enum GNUNET_BLOCK_Type type;
774};
775
776/**
777 * Identity of this peer.
778 */
779static struct GNUNET_PeerIdentity my_id;
780 70
781/** 71/**
782 * Our connection to the datastore. 72 * Our connection to the datastore.
783 */ 73 */
784static struct GNUNET_DATASTORE_Handle *dsh; 74struct GNUNET_DATASTORE_Handle *GSF_dsh;
785
786/**
787 * Our block context.
788 */
789static struct GNUNET_BLOCK_Context *block_ctx;
790
791/**
792 * Our block configuration.
793 */
794static struct GNUNET_CONFIGURATION_Handle *block_cfg;
795 75
796/** 76/**
797 * Our configuration. 77 * Our configuration.
798 */ 78 */
799static const struct GNUNET_CONFIGURATION_Handle *cfg; 79const struct GNUNET_CONFIGURATION_Handle *GSF_cfg;
800
801/**
802 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
803 */
804static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
805
806/**
807 * Map of peer identifiers to "struct PendingRequest" (for that peer).
808 */
809static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
810
811/**
812 * Map of query identifiers to "struct PendingRequest" (for that query).
813 */
814static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
815
816/**
817 * Heap with the request that will expire next at the top. Contains
818 * pointers of type "struct PendingRequest*"; these will *also* be
819 * aliased from the "requests_by_peer" data structures and the
820 * "requests_by_query" table. Note that requests from our clients
821 * don't expire and are thus NOT in the "requests_by_expiration"
822 * (or the "requests_by_peer" tables).
823 */
824static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
825 80
826/** 81/**
827 * Handle for reporting statistics. 82 * Handle for reporting statistics.
828 */ 83 */
829static struct GNUNET_STATISTICS_Handle *stats; 84struct GNUNET_STATISTICS_Handle *GSF_stats;
830
831/**
832 * Linked list of clients we are currently processing requests for.
833 */
834static struct ClientList *client_list;
835
836/**
837 * Pointer to handle to the core service (points to NULL until we've
838 * connected to it).
839 */
840static struct GNUNET_CORE_Handle *core;
841
842/**
843 * Head of linked list of blocks that can be migrated.
844 */
845static struct MigrationReadyBlock *mig_head;
846
847/**
848 * Tail of linked list of blocks that can be migrated.
849 */
850static struct MigrationReadyBlock *mig_tail;
851
852/**
853 * Request to datastore for migration (or NULL).
854 */
855static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
856
857/**
858 * Request to datastore for DHT PUTs (or NULL).
859 */
860static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
861
862/**
863 * Type we will request for the next DHT PUT round from the datastore.
864 */
865static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
866
867/**
868 * Where do we store trust information?
869 */
870static char *trustDirectory;
871
872/**
873 * ID of task that collects blocks for migration.
874 */
875static GNUNET_SCHEDULER_TaskIdentifier mig_task;
876
877/**
878 * ID of task that collects blocks for DHT PUTs.
879 */
880static GNUNET_SCHEDULER_TaskIdentifier dht_task;
881
882/**
883 * What is the maximum frequency at which we are allowed to
884 * poll the datastore for migration content?
885 */
886static struct GNUNET_TIME_Relative min_migration_delay;
887 85
888/** 86/**
889 * Handle for DHT operations. 87 * Handle for DHT operations.
890 */ 88 */
891static struct GNUNET_DHT_Handle *dht_handle; 89struct GNUNET_DHT_Handle *GSF_dht;
892
893/**
894 * Size of the doubly-linked list of migration blocks.
895 */
896static unsigned int mig_size;
897
898/**
899 * Are we allowed to migrate content to this peer.
900 */
901static int active_to_migration;
902 90
903/** 91/**
904 * Are we allowed to push out content from this peer. 92 * How long do requests typically stay in the routing table?
905 */
906static int active_from_migration;
907
908/**
909 * How many entires with zero anonymity do we currently estimate
910 * to have in the database?
911 */ 93 */
912static unsigned int zero_anonymity_count_estimate; 94struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime;
913 95
914/** 96/**
915 * Typical priorities we're seeing from other peers right now. Since 97 * Typical priorities we're seeing from other peers right now. Since
@@ -921,1509 +103,87 @@ static unsigned int zero_anonymity_count_estimate;
921 * receiving the largest possible priority can still only raise our 103 * receiving the largest possible priority can still only raise our
922 * "current_priorities" by at most 1. 104 * "current_priorities" by at most 1.
923 */ 105 */
924static double current_priorities; 106double GSF_current_priorities;
925
926/**
927 * Datastore 'GET' load tracking.
928 */
929static struct GNUNET_LOAD_Value *datastore_get_load;
930
931/**
932 * Datastore 'PUT' load tracking.
933 */
934static struct GNUNET_LOAD_Value *datastore_put_load;
935
936/**
937 * How long do requests typically stay in the routing table?
938 */
939static struct GNUNET_LOAD_Value *rt_entry_lifetime;
940 107
941/** 108/**
942 * How many query messages have we received 'recently' that 109 * How many query messages have we received 'recently' that
943 * have not yet been claimed as cover traffic? 110 * have not yet been claimed as cover traffic?
944 */ 111 */
945static unsigned int cover_query_count; 112unsigned int GSF_cover_query_count;
946 113
947/** 114/**
948 * How many content messages have we received 'recently' that 115 * How many content messages have we received 'recently' that
949 * have not yet been claimed as cover traffic? 116 * have not yet been claimed as cover traffic?
950 */ 117 */
951static unsigned int cover_content_count; 118unsigned int GSF_cover_content_count;
952
953/**
954 * ID of our task that we use to age the cover counters.
955 */
956static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
957
958
959static void
960age_cover_counters (void *cls,
961 const struct GNUNET_SCHEDULER_TaskContext *tc)
962{
963 cover_content_count = (cover_content_count * 15) / 16;
964 cover_query_count = (cover_query_count * 15) / 16;
965 cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
966 &age_cover_counters,
967 NULL);
968}
969
970/**
971 * We've just now completed a datastore request. Update our
972 * datastore load calculations.
973 *
974 * @param start time when the datastore request was issued
975 */
976static void
977update_datastore_delays (struct GNUNET_TIME_Absolute start)
978{
979 struct GNUNET_TIME_Relative delay;
980
981 delay = GNUNET_TIME_absolute_get_duration (start);
982 GNUNET_LOAD_update (datastore_get_load,
983 delay.rel_value);
984}
985
986
987/**
988 * Get the filename under which we would store the GNUNET_HELLO_Message
989 * for the given host and protocol.
990 * @return filename of the form DIRECTORY/HOSTID
991 */
992static char *
993get_trust_filename (const struct GNUNET_PeerIdentity *id)
994{
995 struct GNUNET_CRYPTO_HashAsciiEncoded fil;
996 char *fn;
997
998 GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
999 GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
1000 return fn;
1001}
1002
1003
1004
1005/**
1006 * Transmit messages by copying it to the target buffer
1007 * "buf". "buf" will be NULL and "size" zero if the socket was closed
1008 * for writing in the meantime. In that case, do nothing
1009 * (the disconnect or shutdown handler will take care of the rest).
1010 * If we were able to transmit messages and there are still more
1011 * pending, ask core again for further calls to this function.
1012 *
1013 * @param cls closure, pointer to the 'struct ConnectedPeer*'
1014 * @param size number of bytes available in buf
1015 * @param buf where the callee should write the message
1016 * @return number of bytes written to buf
1017 */
1018static size_t
1019transmit_to_peer (void *cls,
1020 size_t size, void *buf);
1021
1022
1023/* ******************* clean up functions ************************ */
1024
1025/**
1026 * Delete the given migration block.
1027 *
1028 * @param mb block to delete
1029 */
1030static void
1031delete_migration_block (struct MigrationReadyBlock *mb)
1032{
1033 GNUNET_CONTAINER_DLL_remove (mig_head,
1034 mig_tail,
1035 mb);
1036 GNUNET_PEER_decrement_rcs (mb->target_list,
1037 MIGRATION_LIST_SIZE);
1038 mig_size--;
1039 GNUNET_free (mb);
1040}
1041
1042 119
1043/** 120/**
1044 * Compare the distance of two peers to a key. 121 * Our block context.
1045 *
1046 * @param key key
1047 * @param p1 first peer
1048 * @param p2 second peer
1049 * @return GNUNET_YES if P1 is closer to key than P2
1050 */
1051static int
1052is_closer (const GNUNET_HashCode *key,
1053 const struct GNUNET_PeerIdentity *p1,
1054 const struct GNUNET_PeerIdentity *p2)
1055{
1056 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1057 &p2->hashPubKey,
1058 key);
1059}
1060
1061
1062/**
1063 * Consider migrating content to a given peer.
1064 *
1065 * @param cls 'struct MigrationReadyBlock*' to select
1066 * targets for (or NULL for none)
1067 * @param key ID of the peer
1068 * @param value 'struct ConnectedPeer' of the peer
1069 * @return GNUNET_YES (always continue iteration)
1070 */
1071static int
1072consider_migration (void *cls,
1073 const GNUNET_HashCode *key,
1074 void *value)
1075{
1076 struct MigrationReadyBlock *mb = cls;
1077 struct ConnectedPeer *cp = value;
1078 struct MigrationReadyBlock *pos;
1079 struct GNUNET_PeerIdentity cppid;
1080 struct GNUNET_PeerIdentity otherpid;
1081 struct GNUNET_PeerIdentity worstpid;
1082 size_t msize;
1083 unsigned int i;
1084 unsigned int repl;
1085
1086 /* consider 'cp' as a migration target for mb */
1087 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1088 return GNUNET_YES; /* peer has requested no migration! */
1089 if (mb != NULL)
1090 {
1091 GNUNET_PEER_resolve (cp->pid,
1092 &cppid);
1093 repl = MIGRATION_LIST_SIZE;
1094 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1095 {
1096 if (mb->target_list[i] == 0)
1097 {
1098 mb->target_list[i] = cp->pid;
1099 GNUNET_PEER_change_rc (mb->target_list[i], 1);
1100 repl = MIGRATION_LIST_SIZE;
1101 break;
1102 }
1103 GNUNET_PEER_resolve (mb->target_list[i],
1104 &otherpid);
1105 if ( (repl == MIGRATION_LIST_SIZE) &&
1106 is_closer (&mb->query,
1107 &cppid,
1108 &otherpid))
1109 {
1110 repl = i;
1111 worstpid = otherpid;
1112 }
1113 else if ( (repl != MIGRATION_LIST_SIZE) &&
1114 (is_closer (&mb->query,
1115 &worstpid,
1116 &otherpid) ) )
1117 {
1118 repl = i;
1119 worstpid = otherpid;
1120 }
1121 }
1122 if (repl != MIGRATION_LIST_SIZE)
1123 {
1124 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1125 mb->target_list[repl] = cp->pid;
1126 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1127 }
1128 }
1129
1130 /* consider scheduling transmission to cp for content migration */
1131 if (cp->cth != NULL)
1132 return GNUNET_YES;
1133 msize = 0;
1134 pos = mig_head;
1135 while (pos != NULL)
1136 {
1137 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1138 {
1139 if (cp->pid == pos->target_list[i])
1140 {
1141 if (msize == 0)
1142 msize = pos->size;
1143 else
1144 msize = GNUNET_MIN (msize,
1145 pos->size);
1146 break;
1147 }
1148 }
1149 pos = pos->next;
1150 }
1151 if (msize == 0)
1152 return GNUNET_YES; /* no content available */
1153#if DEBUG_FS
1154 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1155 "Trying to migrate at least %u bytes to peer `%s'\n",
1156 msize,
1157 GNUNET_h2s (key));
1158#endif
1159 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1160 {
1161 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1162 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1163 }
1164 cp->cth
1165 = GNUNET_CORE_notify_transmit_ready (core,
1166 GNUNET_YES,
1167 0, GNUNET_TIME_UNIT_FOREVER_REL,
1168 (const struct GNUNET_PeerIdentity*) key,
1169 msize + sizeof (struct PutMessage),
1170 &transmit_to_peer,
1171 cp);
1172 return GNUNET_YES;
1173}
1174
1175
1176/**
1177 * Task that is run periodically to obtain blocks for content
1178 * migration
1179 *
1180 * @param cls unused
1181 * @param tc scheduler context (also unused)
1182 */
1183static void
1184gather_migration_blocks (void *cls,
1185 const struct GNUNET_SCHEDULER_TaskContext *tc);
1186
1187
1188
1189
1190/**
1191 * Task that is run periodically to obtain blocks for DHT PUTs.
1192 *
1193 * @param cls type of blocks to gather
1194 * @param tc scheduler context (unused)
1195 */
1196static void
1197gather_dht_put_blocks (void *cls,
1198 const struct GNUNET_SCHEDULER_TaskContext *tc);
1199
1200
1201/**
1202 * If the migration task is not currently running, consider
1203 * (re)scheduling it with the appropriate delay.
1204 */
1205static void
1206consider_migration_gathering ()
1207{
1208 struct GNUNET_TIME_Relative delay;
1209
1210 if (dsh == NULL)
1211 return;
1212 if (mig_qe != NULL)
1213 return;
1214 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1215 return;
1216 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1217 mig_size);
1218 delay = GNUNET_TIME_relative_divide (delay,
1219 MAX_MIGRATION_QUEUE);
1220 delay = GNUNET_TIME_relative_max (delay,
1221 min_migration_delay);
1222 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1223 &gather_migration_blocks,
1224 NULL);
1225}
1226
1227
1228/**
1229 * If the DHT PUT gathering task is not currently running, consider
1230 * (re)scheduling it with the appropriate delay.
1231 */
1232static void
1233consider_dht_put_gathering (void *cls)
1234{
1235 struct GNUNET_TIME_Relative delay;
1236
1237 if (dsh == NULL)
1238 return;
1239 if (dht_qe != NULL)
1240 return;
1241 if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1242 return;
1243 if (zero_anonymity_count_estimate > 0)
1244 {
1245 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1246 zero_anonymity_count_estimate);
1247 delay = GNUNET_TIME_relative_min (delay,
1248 MAX_DHT_PUT_FREQ);
1249 }
1250 else
1251 {
1252 /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1253 (hopefully) appear */
1254 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1255 }
1256 dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1257 &gather_dht_put_blocks,
1258 cls);
1259}
1260
1261
1262/**
1263 * Process content offered for migration.
1264 *
1265 * @param cls closure
1266 * @param key key for the content
1267 * @param size number of bytes in data
1268 * @param data content stored
1269 * @param type type of the content
1270 * @param priority priority of the content
1271 * @param anonymity anonymity-level for the content
1272 * @param expiration expiration time for the content
1273 * @param uid unique identifier for the datum;
1274 * maybe 0 if no unique identifier is available
1275 */
1276static void
1277process_migration_content (void *cls,
1278 const GNUNET_HashCode * key,
1279 size_t size,
1280 const void *data,
1281 enum GNUNET_BLOCK_Type type,
1282 uint32_t priority,
1283 uint32_t anonymity,
1284 struct GNUNET_TIME_Absolute
1285 expiration, uint64_t uid)
1286{
1287 struct MigrationReadyBlock *mb;
1288
1289 if (key == NULL)
1290 {
1291 mig_qe = NULL;
1292 if (mig_size < MAX_MIGRATION_QUEUE)
1293 consider_migration_gathering ();
1294 return;
1295 }
1296 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
1297 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
1298 {
1299 /* content will expire soon, don't bother */
1300 GNUNET_DATASTORE_iterate_get_next (dsh);
1301 return;
1302 }
1303 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1304 {
1305 if (GNUNET_OK !=
1306 GNUNET_FS_handle_on_demand_block (key, size, data,
1307 type, priority, anonymity,
1308 expiration, uid,
1309 &process_migration_content,
1310 NULL))
1311 {
1312 GNUNET_DATASTORE_iterate_get_next (dsh);
1313 }
1314 return;
1315 }
1316#if DEBUG_FS
1317 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318 "Retrieved block `%s' of type %u for migration\n",
1319 GNUNET_h2s (key),
1320 type);
1321#endif
1322 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1323 mb->query = *key;
1324 mb->expiration = expiration;
1325 mb->size = size;
1326 mb->type = type;
1327 memcpy (&mb[1], data, size);
1328 GNUNET_CONTAINER_DLL_insert_after (mig_head,
1329 mig_tail,
1330 mig_tail,
1331 mb);
1332 mig_size++;
1333 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1334 &consider_migration,
1335 mb);
1336 GNUNET_DATASTORE_iterate_get_next (dsh);
1337}
1338
1339
1340/**
1341 * Function called upon completion of the DHT PUT operation.
1342 */
1343static void
1344dht_put_continuation (void *cls,
1345 const struct GNUNET_SCHEDULER_TaskContext *tc)
1346{
1347 GNUNET_DATASTORE_iterate_get_next (dsh);
1348}
1349
1350
1351/**
1352 * Store content in DHT.
1353 *
1354 * @param cls closure
1355 * @param key key for the content
1356 * @param size number of bytes in data
1357 * @param data content stored
1358 * @param type type of the content
1359 * @param priority priority of the content
1360 * @param anonymity anonymity-level for the content
1361 * @param expiration expiration time for the content
1362 * @param uid unique identifier for the datum;
1363 * maybe 0 if no unique identifier is available
1364 */
1365static void
1366process_dht_put_content (void *cls,
1367 const GNUNET_HashCode * key,
1368 size_t size,
1369 const void *data,
1370 enum GNUNET_BLOCK_Type type,
1371 uint32_t priority,
1372 uint32_t anonymity,
1373 struct GNUNET_TIME_Absolute
1374 expiration, uint64_t uid)
1375{
1376 static unsigned int counter;
1377 static GNUNET_HashCode last_vhash;
1378 static GNUNET_HashCode vhash;
1379
1380 if (key == NULL)
1381 {
1382 dht_qe = NULL;
1383 consider_dht_put_gathering (cls);
1384 return;
1385 }
1386 /* slightly funky code to estimate the total number of values with zero
1387 anonymity from the maximum observed length of a monotonically increasing
1388 sequence of hashes over the contents */
1389 GNUNET_CRYPTO_hash (data, size, &vhash);
1390 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1391 {
1392 if (zero_anonymity_count_estimate > 0)
1393 zero_anonymity_count_estimate /= 2;
1394 counter = 0;
1395 }
1396 last_vhash = vhash;
1397 if (counter < 31)
1398 counter++;
1399 if (zero_anonymity_count_estimate < (1 << counter))
1400 zero_anonymity_count_estimate = (1 << counter);
1401#if DEBUG_FS
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403 "Retrieved block `%s' of type %u for DHT PUT\n",
1404 GNUNET_h2s (key),
1405 type);
1406#endif
1407 GNUNET_DHT_put (dht_handle,
1408 key,
1409 DEFAULT_PUT_REPLICATION,
1410 GNUNET_DHT_RO_NONE,
1411 type,
1412 size,
1413 data,
1414 expiration,
1415 GNUNET_TIME_UNIT_FOREVER_REL,
1416 &dht_put_continuation,
1417 cls);
1418}
1419
1420
1421/**
1422 * Task that is run periodically to obtain blocks for content
1423 * migration
1424 *
1425 * @param cls unused
1426 * @param tc scheduler context (also unused)
1427 */
1428static void
1429gather_migration_blocks (void *cls,
1430 const struct GNUNET_SCHEDULER_TaskContext *tc)
1431{
1432 mig_task = GNUNET_SCHEDULER_NO_TASK;
1433 if (dsh != NULL)
1434 {
1435 mig_qe = GNUNET_DATASTORE_get_for_replication (dsh, 0, UINT_MAX,
1436 GNUNET_TIME_UNIT_FOREVER_REL,
1437 &process_migration_content, NULL);
1438 GNUNET_assert (mig_qe != NULL);
1439 }
1440}
1441
1442
1443/**
1444 * Task that is run periodically to obtain blocks for DHT PUTs.
1445 *
1446 * @param cls type of blocks to gather
1447 * @param tc scheduler context (unused)
1448 */
1449static void
1450gather_dht_put_blocks (void *cls,
1451 const struct GNUNET_SCHEDULER_TaskContext *tc)
1452{
1453 dht_task = GNUNET_SCHEDULER_NO_TASK;
1454 if (dsh != NULL)
1455 {
1456 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1457 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1458 dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (dsh, 0, UINT_MAX,
1459 GNUNET_TIME_UNIT_FOREVER_REL,
1460 dht_put_type++,
1461 &process_dht_put_content, NULL);
1462 GNUNET_assert (dht_qe != NULL);
1463 }
1464}
1465
1466
1467/**
1468 * We're done with a particular message list entry.
1469 * Free all associated resources.
1470 *
1471 * @param pml entry to destroy
1472 */
1473static void
1474destroy_pending_message_list_entry (struct PendingMessageList *pml)
1475{
1476 GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1477 pml->req->pending_tail,
1478 pml);
1479 GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1480 pml->target->pending_messages_tail,
1481 pml->pm);
1482 GNUNET_assert (pml->target->pending_requests > 0);
1483 pml->target->pending_requests--;
1484 GNUNET_free (pml->pm);
1485 GNUNET_free (pml);
1486}
1487
1488
1489/**
1490 * Destroy the given pending message (and call the respective
1491 * continuation).
1492 *
1493 * @param pm message to destroy
1494 * @param tpid id of peer that the message was delivered to, or 0 for none
1495 */
1496static void
1497destroy_pending_message (struct PendingMessage *pm,
1498 GNUNET_PEER_Id tpid)
1499{
1500 struct PendingMessageList *pml = pm->pml;
1501 TransmissionContinuation cont;
1502 void *cont_cls;
1503
1504 cont = pm->cont;
1505 cont_cls = pm->cont_cls;
1506 if (pml != NULL)
1507 {
1508 GNUNET_assert (pml->pm == pm);
1509 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1510 destroy_pending_message_list_entry (pml);
1511 }
1512 else
1513 {
1514 GNUNET_free (pm);
1515 }
1516 if (cont != NULL)
1517 cont (cont_cls, tpid);
1518}
1519
1520
1521/**
1522 * We're done processing a particular request.
1523 * Free all associated resources.
1524 *
1525 * @param pr request to destroy
1526 */
1527static void
1528destroy_pending_request (struct PendingRequest *pr)
1529{
1530 struct GNUNET_PeerIdentity pid;
1531 unsigned int i;
1532
1533 if (pr->hnode != NULL)
1534 {
1535 GNUNET_CONTAINER_heap_remove_node (pr->hnode);
1536 pr->hnode = NULL;
1537 }
1538 if (NULL == pr->client_request_list)
1539 {
1540 GNUNET_STATISTICS_update (stats,
1541 gettext_noop ("# P2P searches active"),
1542 -1,
1543 GNUNET_NO);
1544 }
1545 else
1546 {
1547 GNUNET_STATISTICS_update (stats,
1548 gettext_noop ("# client searches active"),
1549 -1,
1550 GNUNET_NO);
1551 }
1552 if (GNUNET_YES ==
1553 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1554 &pr->query,
1555 pr))
1556 {
1557 GNUNET_LOAD_update (rt_entry_lifetime,
1558 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1559 }
1560 if (pr->qe != NULL)
1561 {
1562 GNUNET_DATASTORE_cancel (pr->qe);
1563 pr->qe = NULL;
1564 }
1565 if (pr->dht_get != NULL)
1566 {
1567 GNUNET_DHT_get_stop (pr->dht_get);
1568 pr->dht_get = NULL;
1569 }
1570 if (pr->client_request_list != NULL)
1571 {
1572 GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1573 pr->client_request_list->client_list->rl_tail,
1574 pr->client_request_list);
1575 GNUNET_free (pr->client_request_list);
1576 pr->client_request_list = NULL;
1577 }
1578 if (pr->cp != NULL)
1579 {
1580 GNUNET_PEER_resolve (pr->cp->pid,
1581 &pid);
1582 (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1583 &pid.hashPubKey,
1584 pr);
1585 pr->cp = NULL;
1586 }
1587 if (pr->bf != NULL)
1588 {
1589 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1590 pr->bf = NULL;
1591 }
1592 if (pr->pirc != NULL)
1593 {
1594 GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1595 pr->pirc->irc = NULL;
1596 pr->pirc = NULL;
1597 }
1598 if (pr->replies_seen != NULL)
1599 {
1600 GNUNET_free (pr->replies_seen);
1601 pr->replies_seen = NULL;
1602 }
1603 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1604 {
1605 GNUNET_SCHEDULER_cancel (pr->task);
1606 pr->task = GNUNET_SCHEDULER_NO_TASK;
1607 }
1608 while (NULL != pr->pending_head)
1609 destroy_pending_message_list_entry (pr->pending_head);
1610 GNUNET_PEER_change_rc (pr->target_pid, -1);
1611 if (pr->used_targets != NULL)
1612 {
1613 for (i=0;i<pr->used_targets_off;i++)
1614 GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1615 GNUNET_free (pr->used_targets);
1616 pr->used_targets_off = 0;
1617 pr->used_targets_size = 0;
1618 pr->used_targets = NULL;
1619 }
1620 GNUNET_free (pr);
1621}
1622
1623
1624/**
1625 * Find latency information in 'atsi'.
1626 *
1627 * @param atsi performance data
1628 * @return connection latency
1629 */
1630static struct GNUNET_TIME_Relative
1631get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1632{
1633 if (atsi == NULL)
1634 return GNUNET_TIME_UNIT_SECONDS;
1635 while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
1636 (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
1637 atsi++;
1638 if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
1639 {
1640 GNUNET_break (0);
1641 /* how can we not have latency data? */
1642 return GNUNET_TIME_UNIT_SECONDS;
1643 }
1644 return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1645 ntohl (atsi->value));
1646}
1647
1648
1649/**
1650 * Method called whenever a given peer connects.
1651 *
1652 * @param cls closure, not used
1653 * @param peer peer identity this notification is about
1654 * @param atsi performance information
1655 */
1656static void
1657peer_connect_handler (void *cls,
1658 const struct
1659 GNUNET_PeerIdentity * peer,
1660 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1661{
1662 struct ConnectedPeer *cp;
1663 struct MigrationReadyBlock *pos;
1664 char *fn;
1665 uint32_t trust;
1666 struct GNUNET_TIME_Relative latency;
1667
1668 if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1669 return;
1670 latency = get_latency (atsi);
1671 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1672 &peer->hashPubKey);
1673 if (NULL != cp)
1674 {
1675 GNUNET_break (0);
1676 return;
1677 }
1678 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1679 cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1680 cp->pid = GNUNET_PEER_intern (peer);
1681
1682 fn = get_trust_filename (peer);
1683 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1684 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1685 cp->disk_trust = cp->trust = ntohl (trust);
1686 GNUNET_free (fn);
1687
1688 GNUNET_break (GNUNET_OK ==
1689 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1690 &peer->hashPubKey,
1691 cp,
1692 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1693
1694 pos = mig_head;
1695 while (NULL != pos)
1696 {
1697 (void) consider_migration (pos, &peer->hashPubKey, cp);
1698 pos = pos->next;
1699 }
1700}
1701
1702
1703/**
1704 * Method called whenever a given peer has a status change.
1705 *
1706 * @param cls closure
1707 * @param peer peer identity this notification is about
1708 * @param bandwidth_in available amount of inbound bandwidth
1709 * @param bandwidth_out available amount of outbound bandwidth
1710 * @param timeout absolute time when this peer will time out
1711 * unless we see some further activity from it
1712 * @param atsi status information
1713 */
1714static void
1715peer_status_handler (void *cls,
1716 const struct
1717 GNUNET_PeerIdentity * peer,
1718 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1719 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1720 struct GNUNET_TIME_Absolute timeout,
1721 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1722{
1723 struct ConnectedPeer *cp;
1724 struct GNUNET_TIME_Relative latency;
1725
1726 latency = get_latency (atsi);
1727 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1728 &peer->hashPubKey);
1729 if (cp == NULL)
1730 {
1731 GNUNET_break (0);
1732 return;
1733 }
1734 GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1735 latency);
1736}
1737
1738
1739
1740/**
1741 * Increase the host credit by a value.
1742 *
1743 * @param host which peer to change the trust value on
1744 * @param value is the int value by which the
1745 * host credit is to be increased or decreased
1746 * @returns the actual change in trust (positive or negative)
1747 */
1748static int
1749change_host_trust (struct ConnectedPeer *host, int value)
1750{
1751 if (value == 0)
1752 return 0;
1753 GNUNET_assert (host != NULL);
1754 if (value > 0)
1755 {
1756 if (host->trust + value < host->trust)
1757 {
1758 value = UINT32_MAX - host->trust;
1759 host->trust = UINT32_MAX;
1760 }
1761 else
1762 host->trust += value;
1763 }
1764 else
1765 {
1766 if (host->trust < -value)
1767 {
1768 value = -host->trust;
1769 host->trust = 0;
1770 }
1771 else
1772 host->trust += value;
1773 }
1774 return value;
1775}
1776
1777
1778/**
1779 * Write host-trust information to a file - flush the buffer entry!
1780 */
1781static int
1782flush_trust (void *cls,
1783 const GNUNET_HashCode *key,
1784 void *value)
1785{
1786 struct ConnectedPeer *host = value;
1787 char *fn;
1788 uint32_t trust;
1789 struct GNUNET_PeerIdentity pid;
1790
1791 if (host->trust == host->disk_trust)
1792 return GNUNET_OK; /* unchanged */
1793 GNUNET_PEER_resolve (host->pid,
1794 &pid);
1795 fn = get_trust_filename (&pid);
1796 if (host->trust == 0)
1797 {
1798 if ((0 != UNLINK (fn)) && (errno != ENOENT))
1799 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1800 GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1801 }
1802 else
1803 {
1804 trust = htonl (host->trust);
1805 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
1806 sizeof(uint32_t),
1807 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1808 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1809 host->disk_trust = host->trust;
1810 }
1811 GNUNET_free (fn);
1812 return GNUNET_OK;
1813}
1814
1815/**
1816 * Call this method periodically to scan data/hosts for new hosts.
1817 */
1818static void
1819cron_flush_trust (void *cls,
1820 const struct GNUNET_SCHEDULER_TaskContext *tc)
1821{
1822
1823 if (NULL == connected_peers)
1824 return;
1825 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1826 &flush_trust,
1827 NULL);
1828 if (NULL == tc)
1829 return;
1830 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1831 return;
1832 GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1833}
1834
1835
1836/**
1837 * Free (each) request made by the peer.
1838 *
1839 * @param cls closure, points to peer that the request belongs to
1840 * @param key current key code
1841 * @param value value in the hash map
1842 * @return GNUNET_YES (we should continue to iterate)
1843 */ 122 */
1844static int 123struct GNUNET_BLOCK_Context *GSF_block_ctx;
1845destroy_request (void *cls,
1846 const GNUNET_HashCode * key,
1847 void *value)
1848{
1849 const struct GNUNET_PeerIdentity * peer = cls;
1850 struct PendingRequest *pr = value;
1851
1852 GNUNET_break (GNUNET_YES ==
1853 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1854 &peer->hashPubKey,
1855 pr));
1856 destroy_pending_request (pr);
1857 return GNUNET_YES;
1858}
1859
1860 124
1861/** 125/**
1862 * Method called whenever a peer disconnects. 126 * Pointer to handle to the core service (points to NULL until we've
1863 * 127 * connected to it).
1864 * @param cls closure, not used
1865 * @param peer peer identity this notification is about
1866 */ 128 */
1867static void 129struct GNUNET_CORE_Handle *GSF_core;
1868peer_disconnect_handler (void *cls,
1869 const struct
1870 GNUNET_PeerIdentity * peer)
1871{
1872 struct ConnectedPeer *cp;
1873 struct PendingMessage *pm;
1874 unsigned int i;
1875 struct MigrationReadyBlock *pos;
1876 struct MigrationReadyBlock *next;
1877 130
1878 if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1879 return;
1880 GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1881 &peer->hashPubKey,
1882 &destroy_request,
1883 (void*) peer);
1884 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1885 &peer->hashPubKey);
1886 if (cp == NULL)
1887 return;
1888 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1889 {
1890 if (NULL != cp->last_client_replies[i])
1891 {
1892 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1893 cp->last_client_replies[i] = NULL;
1894 }
1895 }
1896 GNUNET_break (GNUNET_YES ==
1897 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1898 &peer->hashPubKey,
1899 cp));
1900 if (cp->irc != NULL)
1901 {
1902 GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1903 cp->irc = NULL;
1904 cp->pr->pirc = NULL;
1905 cp->pr = NULL;
1906 }
1907
1908 /* remove this peer from migration considerations; schedule
1909 alternatives */
1910 next = mig_head;
1911 while (NULL != (pos = next))
1912 {
1913 next = pos->next;
1914 for (i=0;i<MIGRATION_LIST_SIZE;i++)
1915 {
1916 if (pos->target_list[i] == cp->pid)
1917 {
1918 GNUNET_PEER_change_rc (pos->target_list[i], -1);
1919 pos->target_list[i] = 0;
1920 }
1921 }
1922 if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1923 {
1924 delete_migration_block (pos);
1925 consider_migration_gathering ();
1926 continue;
1927 }
1928 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1929 &consider_migration,
1930 pos);
1931 }
1932 GNUNET_PEER_change_rc (cp->pid, -1);
1933 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1934 if (NULL != cp->cth)
1935 {
1936 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1937 cp->cth = NULL;
1938 }
1939 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1940 {
1941 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1942 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1943 }
1944 while (NULL != (pm = cp->pending_messages_head))
1945 destroy_pending_message (pm, 0 /* delivery failed */);
1946 GNUNET_LOAD_value_free (cp->transmission_delay);
1947 GNUNET_break (0 == cp->pending_requests);
1948 GNUNET_free (cp);
1949}
1950 131
132/* ***************************** locals ******************************* */
1951 133
1952/** 134/**
1953 * Iterator over hash map entries that removes all occurences 135 * Configuration for block library.
1954 * of the given 'client' from the 'last_client_replies' of the
1955 * given connected peer.
1956 *
1957 * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1958 * @param key current key code (unused)
1959 * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1960 * @return GNUNET_YES (we should continue to iterate)
1961 */ 136 */
1962static int 137static struct GNUNET_CONFIGURATION_Handle *block_cfg;
1963remove_client_from_last_client_replies (void *cls,
1964 const GNUNET_HashCode * key,
1965 void *value)
1966{
1967 struct GNUNET_SERVER_Client *client = cls;
1968 struct ConnectedPeer *cp = value;
1969 unsigned int i;
1970
1971 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1972 {
1973 if (cp->last_client_replies[i] == client)
1974 {
1975 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1976 cp->last_client_replies[i] = NULL;
1977 }
1978 }
1979 return GNUNET_YES;
1980}
1981
1982 138
1983/** 139/**
1984 * A client disconnected. Remove all of its pending queries. 140 * ID of our task that we use to age the cover counters.
1985 *
1986 * @param cls closure, NULL
1987 * @param client identification of the client
1988 */ 141 */
1989static void 142static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
1990handle_client_disconnect (void *cls,
1991 struct GNUNET_SERVER_Client
1992 * client)
1993{
1994 struct ClientList *pos;
1995 struct ClientList *prev;
1996 struct ClientRequestList *rcl;
1997 struct ClientResponseMessage *creply;
1998
1999 if (client == NULL)
2000 return;
2001 prev = NULL;
2002 pos = client_list;
2003 while ( (NULL != pos) &&
2004 (pos->client != client) )
2005 {
2006 prev = pos;
2007 pos = pos->next;
2008 }
2009 if (pos == NULL)
2010 return; /* no requests pending for this client */
2011 while (NULL != (rcl = pos->rl_head))
2012 {
2013 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2014 "Destroying pending request `%s' on disconnect\n",
2015 GNUNET_h2s (&rcl->req->query));
2016 destroy_pending_request (rcl->req);
2017 }
2018 if (prev == NULL)
2019 client_list = pos->next;
2020 else
2021 prev->next = pos->next;
2022 if (pos->th != NULL)
2023 {
2024 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
2025 pos->th = NULL;
2026 }
2027 while (NULL != (creply = pos->res_head))
2028 {
2029 GNUNET_CONTAINER_DLL_remove (pos->res_head,
2030 pos->res_tail,
2031 creply);
2032 GNUNET_free (creply);
2033 }
2034 GNUNET_SERVER_client_drop (pos->client);
2035 GNUNET_free (pos);
2036 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2037 &remove_client_from_last_client_replies,
2038 client);
2039}
2040
2041 143
2042/** 144/**
2043 * Iterator to free peer entries. 145 * Datastore 'GET' load tracking.
2044 *
2045 * @param cls closure, unused
2046 * @param key current key code
2047 * @param value value in the hash map (peer entry)
2048 * @return GNUNET_YES (we should continue to iterate)
2049 */ 146 */
2050static int 147static struct GNUNET_LOAD_Value *datastore_get_load;
2051clean_peer (void *cls,
2052 const GNUNET_HashCode * key,
2053 void *value)
2054{
2055 peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
2056 return GNUNET_YES;
2057}
2058
2059 148
2060/** 149/**
2061 * Task run during shutdown. 150 * Identity of this peer.
2062 *
2063 * @param cls unused
2064 * @param tc unused
2065 */ 151 */
2066static void 152static struct GNUNET_PeerIdentity my_id;
2067shutdown_task (void *cls,
2068 const struct GNUNET_SCHEDULER_TaskContext *tc)
2069{
2070 if (mig_qe != NULL)
2071 {
2072 GNUNET_DATASTORE_cancel (mig_qe);
2073 mig_qe = NULL;
2074 }
2075 if (dht_qe != NULL)
2076 {
2077 GNUNET_DATASTORE_cancel (dht_qe);
2078 dht_qe = NULL;
2079 }
2080 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2081 {
2082 GNUNET_SCHEDULER_cancel (mig_task);
2083 mig_task = GNUNET_SCHEDULER_NO_TASK;
2084 }
2085 if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2086 {
2087 GNUNET_SCHEDULER_cancel (dht_task);
2088 dht_task = GNUNET_SCHEDULER_NO_TASK;
2089 }
2090 while (client_list != NULL)
2091 handle_client_disconnect (NULL,
2092 client_list->client);
2093 cron_flush_trust (NULL, NULL);
2094 GNUNET_assert (NULL != core);
2095 GNUNET_CORE_disconnect (core);
2096 core = NULL;
2097 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2098 &clean_peer,
2099 NULL);
2100 GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2101 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2102 requests_by_expiration_heap = 0;
2103 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2104 connected_peers = NULL;
2105 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2106 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2107 query_request_map = NULL;
2108 GNUNET_LOAD_value_free (rt_entry_lifetime);
2109 rt_entry_lifetime = NULL;
2110 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2111 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2112 peer_request_map = NULL;
2113 if (stats != NULL)
2114 {
2115 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2116 stats = NULL;
2117 }
2118 if (dsh != NULL)
2119 {
2120 GNUNET_DATASTORE_disconnect (dsh,
2121 GNUNET_NO);
2122 dsh = NULL;
2123 }
2124 while (mig_head != NULL)
2125 delete_migration_block (mig_head);
2126 GNUNET_assert (0 == mig_size);
2127 GNUNET_DHT_disconnect (dht_handle);
2128 dht_handle = NULL;
2129 GNUNET_LOAD_value_free (datastore_get_load);
2130 datastore_get_load = NULL;
2131 GNUNET_LOAD_value_free (datastore_put_load);
2132 datastore_put_load = NULL;
2133 GNUNET_BLOCK_context_destroy (block_ctx);
2134 block_ctx = NULL;
2135 GNUNET_CONFIGURATION_destroy (block_cfg);
2136 block_cfg = NULL;
2137 cfg = NULL;
2138 GNUNET_free_non_null (trustDirectory);
2139 trustDirectory = NULL;
2140 GNUNET_SCHEDULER_cancel (cover_age_task);
2141 cover_age_task = GNUNET_SCHEDULER_NO_TASK;
2142}
2143
2144
2145/* ******************* Utility functions ******************** */
2146
2147 153
2148/** 154/**
2149 * We've had to delay a request for transmission to core, but now 155 * Task that periodically ages our cover traffic statistics.
2150 * we should be ready. Run it.
2151 * 156 *
2152 * @param cls the 'struct ConnectedPeer' for which a request was delayed 157 * @param cls unused closure
2153 * @param tc task context (unused) 158 * @param tc task context
2154 */ 159 */
2155static void 160static void
2156delayed_transmission_request (void *cls, 161age_cover_counters (void *cls,
2157 const struct GNUNET_SCHEDULER_TaskContext *tc) 162 const struct GNUNET_SCHEDULER_TaskContext *tc)
2158{ 163{
2159 struct ConnectedPeer *cp = cls; 164 GSF_cover_content_count = (GSF_cover_content_count * 15) / 16;
2160 struct GNUNET_PeerIdentity pid; 165 GSF_cover_query_count = (GSF_cover_query_count * 15) / 16;
2161 struct PendingMessage *pm; 166 cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
2162 167 &age_cover_counters,
2163 pm = cp->pending_messages_head; 168 NULL);
2164 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2165 GNUNET_assert (cp->cth == NULL);
2166 if (pm == NULL)
2167 return;
2168 GNUNET_PEER_resolve (cp->pid,
2169 &pid);
2170 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2171 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2172 GNUNET_YES,
2173 pm->priority,
2174 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2175 &pid,
2176 pm->msize,
2177 &transmit_to_peer,
2178 cp);
2179} 169}
2180 170
2181 171
2182/**
2183 * Transmit messages by copying it to the target buffer
2184 * "buf". "buf" will be NULL and "size" zero if the socket was closed
2185 * for writing in the meantime. In that case, do nothing
2186 * (the disconnect or shutdown handler will take care of the rest).
2187 * If we were able to transmit messages and there are still more
2188 * pending, ask core again for further calls to this function.
2189 *
2190 * @param cls closure, pointer to the 'struct ConnectedPeer*'
2191 * @param size number of bytes available in buf
2192 * @param buf where the callee should write the message
2193 * @return number of bytes written to buf
2194 */
2195static size_t
2196transmit_to_peer (void *cls,
2197 size_t size, void *buf)
2198{
2199 struct ConnectedPeer *cp = cls;
2200 char *cbuf = buf;
2201 struct PendingMessage *pm;
2202 struct PendingMessage *next_pm;
2203 struct GNUNET_TIME_Absolute now;
2204 struct GNUNET_TIME_Relative min_delay;
2205 struct MigrationReadyBlock *mb;
2206 struct MigrationReadyBlock *next;
2207 struct PutMessage migm;
2208 size_t msize;
2209 unsigned int i;
2210 struct GNUNET_PeerIdentity pid;
2211
2212 cp->cth = NULL;
2213 if (NULL == buf)
2214 {
2215#if DEBUG_FS
2216 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2217 "Dropping message, core too busy.\n");
2218#endif
2219 GNUNET_LOAD_update (cp->transmission_delay,
2220 UINT64_MAX);
2221
2222 if (NULL != (pm = cp->pending_messages_head))
2223 {
2224 GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2225 cp->pending_messages_tail,
2226 pm);
2227 GNUNET_assert (cp->pending_requests > 0);
2228 cp->pending_requests--;
2229 destroy_pending_message (pm, 0);
2230 }
2231 if (NULL != (pm = cp->pending_messages_head))
2232 {
2233 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2234 min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2235 cp->delayed_transmission_request_task
2236 = GNUNET_SCHEDULER_add_delayed (min_delay,
2237 &delayed_transmission_request,
2238 cp);
2239 }
2240 return 0;
2241 }
2242 GNUNET_LOAD_update (cp->transmission_delay,
2243 GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2244 now = GNUNET_TIME_absolute_get ();
2245 msize = 0;
2246 min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2247 next_pm = cp->pending_messages_head;
2248 while ( (NULL != (pm = next_pm) ) &&
2249 (pm->msize <= size) )
2250 {
2251 next_pm = pm->next;
2252 if (pm->delay_until.abs_value > now.abs_value)
2253 {
2254 min_delay = GNUNET_TIME_relative_min (min_delay,
2255 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2256 continue;
2257 }
2258 memcpy (&cbuf[msize], &pm[1], pm->msize);
2259 msize += pm->msize;
2260 size -= pm->msize;
2261 if (NULL == pm->pml)
2262 {
2263 GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2264 cp->pending_messages_tail,
2265 pm);
2266 GNUNET_assert (cp->pending_requests > 0);
2267 cp->pending_requests--;
2268 }
2269 destroy_pending_message (pm, cp->pid);
2270 }
2271 if (pm != NULL)
2272 min_delay = GNUNET_TIME_UNIT_ZERO;
2273 if (NULL != cp->pending_messages_head)
2274 {
2275 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2276 cp->delayed_transmission_request_task
2277 = GNUNET_SCHEDULER_add_delayed (min_delay,
2278 &delayed_transmission_request,
2279 cp);
2280 }
2281 if (pm == NULL)
2282 {
2283 GNUNET_PEER_resolve (cp->pid,
2284 &pid);
2285 next = mig_head;
2286 while (NULL != (mb = next))
2287 {
2288 next = mb->next;
2289 for (i=0;i<MIGRATION_LIST_SIZE;i++)
2290 {
2291 if ( (cp->pid == mb->target_list[i]) &&
2292 (mb->size + sizeof (migm) <= size) )
2293 {
2294 GNUNET_PEER_change_rc (mb->target_list[i], -1);
2295 mb->target_list[i] = 0;
2296 mb->used_targets++;
2297 memset (&migm, 0, sizeof (migm));
2298 migm.header.size = htons (sizeof (migm) + mb->size);
2299 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2300 migm.type = htonl (mb->type);
2301 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2302 memcpy (&cbuf[msize], &migm, sizeof (migm));
2303 msize += sizeof (migm);
2304 size -= sizeof (migm);
2305 memcpy (&cbuf[msize], &mb[1], mb->size);
2306 msize += mb->size;
2307 size -= mb->size;
2308#if DEBUG_FS
2309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2310 "Pushing migration block `%s' (%u bytes) to `%s'\n",
2311 GNUNET_h2s (&mb->query),
2312 (unsigned int) mb->size,
2313 GNUNET_i2s (&pid));
2314#endif
2315 break;
2316 }
2317 else
2318 {
2319#if DEBUG_FS
2320 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2321 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2322 GNUNET_h2s (&mb->query),
2323 (unsigned int) mb->size,
2324 GNUNET_i2s (&pid));
2325#endif
2326 }
2327 }
2328 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2329 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2330 {
2331 delete_migration_block (mb);
2332 consider_migration_gathering ();
2333 }
2334 }
2335 consider_migration (NULL,
2336 &pid.hashPubKey,
2337 cp);
2338 }
2339#if DEBUG_FS
2340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2341 "Transmitting %u bytes to peer with PID %u\n",
2342 (unsigned int) msize,
2343 (unsigned int) cp->pid);
2344#endif
2345 return msize;
2346}
2347
2348 172
2349/** 173/**
2350 * Add a message to the set of pending messages for the given peer. 174 * We've just now completed a datastore request. Update our
175 * datastore load calculations.
2351 * 176 *
2352 * @param cp peer to send message to 177 * @param start time when the datastore request was issued
2353 * @param pm message to queue
2354 * @param pr request on which behalf this message is being queued
2355 */ 178 */
2356static void 179void
2357add_to_pending_messages_for_peer (struct ConnectedPeer *cp, 180GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start)
2358 struct PendingMessage *pm,
2359 struct PendingRequest *pr)
2360{ 181{
2361 struct PendingMessage *pos; 182 struct GNUNET_TIME_Relative delay;
2362 struct PendingMessageList *pml;
2363 struct GNUNET_PeerIdentity pid;
2364 183
2365 GNUNET_assert (pm->next == NULL); 184 delay = GNUNET_TIME_absolute_get_duration (start);
2366 GNUNET_assert (pm->pml == NULL); 185 GNUNET_LOAD_update (datastore_get_load,
2367 if (pr != NULL) 186 delay.rel_value);
2368 {
2369 pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2370 pml->req = pr;
2371 pml->target = cp;
2372 pml->pm = pm;
2373 pm->pml = pml;
2374 GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2375 pr->pending_tail,
2376 pml);
2377 }
2378 pos = cp->pending_messages_head;
2379 while ( (pos != NULL) &&
2380 (pm->priority < pos->priority) )
2381 pos = pos->next;
2382 GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2383 cp->pending_messages_tail,
2384 pos,
2385 pm);
2386 cp->pending_requests++;
2387 if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2388 {
2389 GNUNET_STATISTICS_update (stats,
2390 gettext_noop ("# P2P searches discarded (queue length bound)"),
2391 1,
2392 GNUNET_NO);
2393 destroy_pending_message (cp->pending_messages_tail, 0);
2394 }
2395 GNUNET_PEER_resolve (cp->pid, &pid);
2396 if (NULL != cp->cth)
2397 {
2398 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2399 cp->cth = NULL;
2400 }
2401 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2402 {
2403 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2404 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2405 }
2406 /* need to schedule transmission */
2407 cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2408 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2409 GNUNET_YES,
2410 cp->pending_messages_head->priority,
2411 MAX_TRANSMIT_DELAY,
2412 &pid,
2413 cp->pending_messages_head->msize,
2414 &transmit_to_peer,
2415 cp);
2416 if (cp->cth == NULL)
2417 {
2418#if DEBUG_FS
2419 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2420 "Failed to schedule transmission with core!\n");
2421#endif
2422 GNUNET_STATISTICS_update (stats,
2423 gettext_noop ("# CORE transmission failures"),
2424 1,
2425 GNUNET_NO);
2426 }
2427} 187}
2428 188
2429 189
@@ -2436,8 +196,8 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2436 * GNUNET_NO to process normally (load normal) 196 * GNUNET_NO to process normally (load normal)
2437 * GNUNET_SYSERR to process for free (load low) 197 * GNUNET_SYSERR to process for free (load low)
2438 */ 198 */
2439static int 199int
2440test_get_load_too_high (uint32_t priority) 200GSF_test_get_load_too_high_ (uint32_t priority)
2441{ 201{
2442 double ld; 202 double ld;
2443 203
@@ -2450,1220 +210,80 @@ test_get_load_too_high (uint32_t priority)
2450} 210}
2451 211
2452 212
2453
2454
2455/**
2456 * Test if the DATABASE (PUT) load on this peer is too high
2457 * to even consider processing the query at
2458 * all.
2459 *
2460 * @return GNUNET_YES if the load is too high to do anything (load high)
2461 * GNUNET_NO to process normally (load normal or low)
2462 */
2463static int
2464test_put_load_too_high (uint32_t priority)
2465{
2466 double ld;
2467
2468 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2469 return GNUNET_NO; /* very fast */
2470 ld = GNUNET_LOAD_get_load (datastore_put_load);
2471 if (ld < 2.0 * (1 + priority))
2472 return GNUNET_NO;
2473 GNUNET_STATISTICS_update (stats,
2474 gettext_noop ("# storage requests dropped due to high load"),
2475 1,
2476 GNUNET_NO);
2477 return GNUNET_YES;
2478}
2479
2480
2481/* ******************* Pending Request Refresh Task ******************** */
2482
2483
2484
2485/**
2486 * We use a random delay to make the timing of requests less
2487 * predictable. This function returns such a random delay. We add a base
2488 * delay of MAX_CORK_DELAY (1s).
2489 *
2490 * FIXME: make schedule dependent on the specifics of the request?
2491 * Or bandwidth and number of connected peers and load?
2492 *
2493 * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2494 */
2495static struct GNUNET_TIME_Relative
2496get_processing_delay ()
2497{
2498 return
2499 GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2500 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2501 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2502 TTL_DECREMENT)));
2503}
2504
2505
2506/**
2507 * We're processing a GET request from another peer and have decided
2508 * to forward it to other peers. This function is called periodically
2509 * and should forward the request to other peers until we have all
2510 * possible replies. If we have transmitted the *only* reply to
2511 * the initiator we should destroy the pending request. If we have
2512 * many replies in the queue to the initiator, we should delay sending
2513 * out more queries until the reply queue has shrunk some.
2514 *
2515 * @param cls our "struct ProcessGetContext *"
2516 * @param tc unused
2517 */
2518static void
2519forward_request_task (void *cls,
2520 const struct GNUNET_SCHEDULER_TaskContext *tc);
2521
2522
2523/**
2524 * Function called after we either failed or succeeded
2525 * at transmitting a query to a peer.
2526 *
2527 * @param cls the requests "struct PendingRequest*"
2528 * @param tpid ID of receiving peer, 0 on transmission error
2529 */
2530static void
2531transmit_query_continuation (void *cls,
2532 GNUNET_PEER_Id tpid)
2533{
2534 struct PendingRequest *pr = cls;
2535 unsigned int i;
2536
2537 GNUNET_STATISTICS_update (stats,
2538 gettext_noop ("# queries scheduled for forwarding"),
2539 -1,
2540 GNUNET_NO);
2541 if (tpid == 0)
2542 {
2543#if DEBUG_FS
2544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2545 "Transmission of request failed, will try again later.\n");
2546#endif
2547 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2548 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2549 &forward_request_task,
2550 pr);
2551 return;
2552 }
2553#if DEBUG_FS
2554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2555 "Transmitted query `%s'\n",
2556 GNUNET_h2s (&pr->query));
2557#endif
2558 GNUNET_STATISTICS_update (stats,
2559 gettext_noop ("# queries forwarded"),
2560 1,
2561 GNUNET_NO);
2562 for (i=0;i<pr->used_targets_off;i++)
2563 if (pr->used_targets[i].pid == tpid)
2564 break; /* found match! */
2565 if (i == pr->used_targets_off)
2566 {
2567 /* need to create new entry */
2568 if (pr->used_targets_off == pr->used_targets_size)
2569 GNUNET_array_grow (pr->used_targets,
2570 pr->used_targets_size,
2571 pr->used_targets_size * 2 + 2);
2572 GNUNET_PEER_change_rc (tpid, 1);
2573 pr->used_targets[pr->used_targets_off].pid = tpid;
2574 pr->used_targets[pr->used_targets_off].num_requests = 0;
2575 i = pr->used_targets_off++;
2576 }
2577 pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2578 pr->used_targets[i].num_requests++;
2579 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2580 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2581 &forward_request_task,
2582 pr);
2583}
2584
2585
2586/**
2587 * How many bytes should a bloomfilter be if we have already seen
2588 * entry_count responses? Note that BLOOMFILTER_K gives us the number
2589 * of bits set per entry. Furthermore, we should not re-size the
2590 * filter too often (to keep it cheap).
2591 *
2592 * Since other peers will also add entries but not resize the filter,
2593 * we should generally pick a slightly larger size than what the
2594 * strict math would suggest.
2595 *
2596 * @return must be a power of two and smaller or equal to 2^15.
2597 */
2598static size_t
2599compute_bloomfilter_size (unsigned int entry_count)
2600{
2601 size_t size;
2602 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2603 uint16_t max = 1 << 15;
2604
2605 if (entry_count > max)
2606 return max;
2607 size = 8;
2608 while ((size < max) && (size < ideal))
2609 size *= 2;
2610 if (size > max)
2611 return max;
2612 return size;
2613}
2614
2615
2616/**
2617 * Recalculate our bloom filter for filtering replies. This function
2618 * will create a new bloom filter from scratch, so it should only be
2619 * called if we have no bloomfilter at all (and hence can create a
2620 * fresh one of minimal size without problems) OR if our peer is the
2621 * initiator (in which case we may resize to larger than mimimum size).
2622 *
2623 * @param pr request for which the BF is to be recomputed
2624 */
2625static void
2626refresh_bloomfilter (struct PendingRequest *pr)
2627{
2628 unsigned int i;
2629 size_t nsize;
2630 GNUNET_HashCode mhash;
2631
2632 nsize = compute_bloomfilter_size (pr->replies_seen_off);
2633 if (nsize == pr->bf_size)
2634 return; /* size not changed */
2635 if (pr->bf != NULL)
2636 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2637 pr->bf_size = nsize;
2638 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2639 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2640 pr->bf_size,
2641 BLOOMFILTER_K);
2642 for (i=0;i<pr->replies_seen_off;i++)
2643 {
2644 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2645 pr->mingle,
2646 &mhash);
2647 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2648 }
2649}
2650
2651
2652/**
2653 * Function called after we've tried to reserve a certain amount of
2654 * bandwidth for a reply. Check if we succeeded and if so send our
2655 * query.
2656 *
2657 * @param cls the requests "struct PendingRequest*"
2658 * @param peer identifies the peer
2659 * @param bpm_out set to the current bandwidth limit (sending) for this peer
2660 * @param amount set to the amount that was actually reserved or unreserved
2661 * @param res_delay if the reservation could not be satisfied (amount was 0), how
2662 * long should the client wait until re-trying?
2663 * @param preference current traffic preference for the given peer
2664 */
2665static void
2666target_reservation_cb (void *cls,
2667 const struct
2668 GNUNET_PeerIdentity * peer,
2669 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2670 int32_t amount,
2671 struct GNUNET_TIME_Relative res_delay,
2672 uint64_t preference)
2673{
2674 struct PendingRequest *pr = cls;
2675 struct ConnectedPeer *cp;
2676 struct PendingMessage *pm;
2677 struct GetMessage *gm;
2678 GNUNET_HashCode *ext;
2679 char *bfdata;
2680 size_t msize;
2681 unsigned int k;
2682 int no_route;
2683 uint32_t bm;
2684 unsigned int i;
2685
2686#if DEBUG_FS
2687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2688 "Core called back... for query `%s'.\n",
2689 GNUNET_h2s (&pr->query));
2690#endif
2691 /* (3) transmit, update ttl/priority */
2692 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2693 &peer->hashPubKey);
2694 if (cp == NULL)
2695 {
2696 /* Peer must have just left */
2697#if DEBUG_FS
2698 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2699 "Selected peer disconnected!\n");
2700#endif
2701 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2702 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2703 &forward_request_task,
2704 pr);
2705 return;
2706 }
2707 cp->irc = NULL;
2708 pr->pirc = NULL;
2709 if (peer == NULL)
2710 {
2711 /* error in communication with core, try again later */
2712 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2713 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2714 &forward_request_task,
2715 pr);
2716 return;
2717 }
2718 no_route = GNUNET_NO;
2719 if (amount == 0)
2720 {
2721 if (pr->cp == NULL)
2722 {
2723#if DEBUG_FS > 1
2724 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2725 "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2726 amount,
2727 DBLOCK_SIZE);
2728#endif
2729 GNUNET_STATISTICS_update (stats,
2730 gettext_noop ("# reply bandwidth reservation requests failed"),
2731 1,
2732 GNUNET_NO);
2733 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2734 pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2735 &forward_request_task,
2736 pr);
2737 return; /* this target round failed */
2738 }
2739 no_route = GNUNET_YES;
2740 }
2741
2742 GNUNET_STATISTICS_update (stats,
2743 gettext_noop ("# queries scheduled for forwarding"),
2744 1,
2745 GNUNET_NO);
2746 for (i=0;i<pr->used_targets_off;i++)
2747 if (pr->used_targets[i].pid == cp->pid)
2748 {
2749 GNUNET_STATISTICS_update (stats,
2750 gettext_noop ("# queries retransmitted to same target"),
2751 1,
2752 GNUNET_NO);
2753 break;
2754 }
2755
2756 /* build message and insert message into priority queue */
2757#if DEBUG_FS
2758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2759 "Forwarding request `%s' to `%4s'!\n",
2760 GNUNET_h2s (&pr->query),
2761 GNUNET_i2s (peer));
2762#endif
2763 k = 0;
2764 bm = 0;
2765 if (GNUNET_YES == no_route)
2766 {
2767 bm |= GET_MESSAGE_BIT_RETURN_TO;
2768 k++;
2769 }
2770 if (pr->namespace != NULL)
2771 {
2772 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2773 k++;
2774 }
2775 if (pr->target_pid != 0)
2776 {
2777 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2778 k++;
2779 }
2780 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2781 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2782 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2783 pm->msize = msize;
2784 gm = (struct GetMessage*) &pm[1];
2785 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2786 gm->header.size = htons (msize);
2787 gm->type = htonl (pr->type);
2788 pr->remaining_priority /= 2;
2789 gm->priority = htonl (pr->remaining_priority);
2790 gm->ttl = htonl (pr->ttl);
2791 gm->filter_mutator = htonl(pr->mingle);
2792 gm->hash_bitmap = htonl (bm);
2793 gm->query = pr->query;
2794 ext = (GNUNET_HashCode*) &gm[1];
2795 k = 0;
2796 if (GNUNET_YES == no_route)
2797 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2798 if (pr->namespace != NULL)
2799 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2800 if (pr->target_pid != 0)
2801 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2802 bfdata = (char *) &ext[k];
2803 if (pr->bf != NULL)
2804 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2805 bfdata,
2806 pr->bf_size);
2807 pm->cont = &transmit_query_continuation;
2808 pm->cont_cls = pr;
2809 cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2810 add_to_pending_messages_for_peer (cp, pm, pr);
2811}
2812
2813
2814/**
2815 * Closure used for "target_peer_select_cb".
2816 */
2817struct PeerSelectionContext
2818{
2819 /**
2820 * The request for which we are selecting
2821 * peers.
2822 */
2823 struct PendingRequest *pr;
2824
2825 /**
2826 * Current "prime" target.
2827 */
2828 struct GNUNET_PeerIdentity target;
2829
2830 /**
2831 * How much do we like this target?
2832 */
2833 double target_score;
2834
2835 /**
2836 * Does it make sense to we re-try quickly again?
2837 */
2838 int fast_retry;
2839
2840};
2841
2842
2843/**
2844 * Function called for each connected peer to determine
2845 * which one(s) would make good targets for forwarding.
2846 *
2847 * @param cls closure (struct PeerSelectionContext)
2848 * @param key current key code (peer identity)
2849 * @param value value in the hash map (struct ConnectedPeer)
2850 * @return GNUNET_YES if we should continue to
2851 * iterate,
2852 * GNUNET_NO if not.
2853 */
2854static int
2855target_peer_select_cb (void *cls,
2856 const GNUNET_HashCode * key,
2857 void *value)
2858{
2859 struct PeerSelectionContext *psc = cls;
2860 struct ConnectedPeer *cp = value;
2861 struct PendingRequest *pr = psc->pr;
2862 struct GNUNET_TIME_Relative delay;
2863 double score;
2864 unsigned int i;
2865 unsigned int pc;
2866
2867 /* 1) check that this peer is not the initiator */
2868 if (cp == pr->cp)
2869 {
2870#if DEBUG_FS
2871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2872 "Skipping initiator in forwarding selection\n");
2873#endif
2874 return GNUNET_YES; /* skip */
2875 }
2876 if (cp->irc != NULL)
2877 {
2878 psc->fast_retry = GNUNET_YES;
2879 return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2880 }
2881
2882 /* 2) check if we have already (recently) forwarded to this peer */
2883 /* 2a) this particular request */
2884 pc = 0;
2885 for (i=0;i<pr->used_targets_off;i++)
2886 if (pr->used_targets[i].pid == cp->pid)
2887 {
2888 pc = pr->used_targets[i].num_requests;
2889 GNUNET_assert (pc > 0);
2890 /* FIXME: make re-enabling a peer independent of how often
2891 this function is called??? */
2892 if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2893 RETRY_PROBABILITY_INV * pc))
2894 {
2895#if DEBUG_FS
2896 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2897 "NOT re-trying query that was previously transmitted %u times\n",
2898 (unsigned int) pc);
2899#endif
2900 return GNUNET_YES; /* skip */
2901 }
2902 break;
2903 }
2904#if DEBUG_FS
2905 if (0 < pc)
2906 {
2907 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2908 "Re-trying query that was previously transmitted %u times to this peer\n",
2909 (unsigned int) pc);
2910 }
2911#endif
2912 /* 2b) many other requests to this peer */
2913 delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2914 if (delay.rel_value <= cp->avg_delay.rel_value)
2915 {
2916#if DEBUG_FS
2917 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2918 "NOT sending query since we send %u others to this peer in the last %llums\n",
2919 MAX_QUEUE_PER_PEER,
2920 cp->avg_delay.rel_value);
2921#endif
2922 return GNUNET_YES; /* skip */
2923 }
2924
2925 /* 3) calculate how much we'd like to forward to this peer,
2926 starting with a random value that is strong enough
2927 to at least give any peer a chance sometimes
2928 (compared to the other factors that come later) */
2929 /* 3a) count successful (recent) routes from cp for same source */
2930 if (pr->cp != NULL)
2931 {
2932 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2933 P2P_SUCCESS_LIST_SIZE);
2934 for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2935 if (cp->last_p2p_replies[i] == pr->cp->pid)
2936 score += 1.0; /* likely successful based on hot path */
2937 }
2938 else
2939 {
2940 score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2941 CS2P_SUCCESS_LIST_SIZE);
2942 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2943 if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2944 score += 1.0; /* likely successful based on hot path */
2945 }
2946 /* 3b) include latency */
2947 if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2948 score += 1.0; /* likely fast based on latency */
2949 /* 3c) include priorities */
2950 if (cp->avg_priority <= pr->remaining_priority / 2.0)
2951 score += 1.0; /* likely successful based on priorities */
2952 /* 3d) penalize for queue size */
2953 score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER);
2954 /* 3e) include peer proximity */
2955 score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2956 &pr->query)) / (double) UINT32_MAX);
2957 /* 4) super-bonus for being the known target */
2958 if (pr->target_pid == cp->pid)
2959 score += 100.0;
2960 /* store best-fit in closure */
2961 score++; /* avoid zero */
2962 if (score > psc->target_score)
2963 {
2964 psc->target_score = score;
2965 psc->target.hashPubKey = *key;
2966 }
2967#if DEBUG_FS
2968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2969 "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2970 GNUNET_h2s (key),
2971 score,
2972 psc->target_score);
2973#endif
2974 return GNUNET_YES;
2975}
2976
2977
2978/**
2979 * The priority level imposes a bound on the maximum
2980 * value for the ttl that can be requested.
2981 *
2982 * @param ttl_in requested ttl
2983 * @param prio given priority
2984 * @return ttl_in if ttl_in is below the limit,
2985 * otherwise the ttl-limit for the given priority
2986 */
2987static int32_t
2988bound_ttl (int32_t ttl_in, uint32_t prio)
2989{
2990 unsigned long long allowed;
2991
2992 if (ttl_in <= 0)
2993 return ttl_in;
2994 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
2995 if (ttl_in > allowed)
2996 {
2997 if (allowed >= (1 << 30))
2998 return 1 << 30;
2999 return allowed;
3000 }
3001 return ttl_in;
3002}
3003
3004
3005/**
3006 * Iterator called on each result obtained for a DHT
3007 * operation that expects a reply
3008 *
3009 * @param cls closure
3010 * @param exp when will this value expire
3011 * @param key key of the result
3012 * @param get_path NULL-terminated array of pointers
3013 * to the peers on reverse GET path (or NULL if not recorded)
3014 * @param put_path NULL-terminated array of pointers
3015 * to the peers on the PUT path (or NULL if not recorded)
3016 * @param type type of the result
3017 * @param size number of bytes in data
3018 * @param data pointer to the result data
3019 */
3020static void
3021process_dht_reply (void *cls,
3022 struct GNUNET_TIME_Absolute exp,
3023 const GNUNET_HashCode * key,
3024 const struct GNUNET_PeerIdentity * const *get_path,
3025 const struct GNUNET_PeerIdentity * const *put_path,
3026 enum GNUNET_BLOCK_Type type,
3027 size_t size,
3028 const void *data);
3029
3030
3031/**
3032 * We're processing a GET request and have decided
3033 * to forward it to other peers. This function is called periodically
3034 * and should forward the request to other peers until we have all
3035 * possible replies. If we have transmitted the *only* reply to
3036 * the initiator we should destroy the pending request. If we have
3037 * many replies in the queue to the initiator, we should delay sending
3038 * out more queries until the reply queue has shrunk some.
3039 *
3040 * @param cls our "struct ProcessGetContext *"
3041 * @param tc unused
3042 */
3043static void
3044forward_request_task (void *cls,
3045 const struct GNUNET_SCHEDULER_TaskContext *tc)
3046{
3047 struct PendingRequest *pr = cls;
3048 struct PeerSelectionContext psc;
3049 struct ConnectedPeer *cp;
3050 struct GNUNET_TIME_Relative delay;
3051
3052 pr->task = GNUNET_SCHEDULER_NO_TASK;
3053 if (pr->pirc != NULL)
3054 {
3055#if DEBUG_FS
3056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3057 "Forwarding of query `%s' not attempted due to pending local lookup!\n",
3058 GNUNET_h2s (&pr->query));
3059#endif
3060 return; /* already pending */
3061 }
3062 if (GNUNET_YES == pr->local_only)
3063 return; /* configured to not do P2P search */
3064 /* (0) try DHT */
3065 if ( (0 == pr->anonymity_level) &&
3066 (GNUNET_YES != pr->forward_only) &&
3067 (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
3068 (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
3069 {
3070 pr->dht_get = GNUNET_DHT_get_start (dht_handle,
3071 GNUNET_TIME_UNIT_FOREVER_REL,
3072 pr->type,
3073 &pr->query,
3074 DEFAULT_GET_REPLICATION,
3075 GNUNET_DHT_RO_NONE,
3076 pr->bf,
3077 pr->mingle,
3078 pr->namespace,
3079 (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3080 &process_dht_reply,
3081 pr);
3082 }
3083
3084 if ( (pr->anonymity_level > 1) &&
3085 (cover_query_count < pr->anonymity_level - 1) )
3086 {
3087 delay = get_processing_delay ();
3088#if DEBUG_FS
3089 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3090 "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
3091 GNUNET_h2s (&pr->query),
3092 delay.rel_value);
3093#endif
3094 pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3095 &forward_request_task,
3096 pr);
3097 return;
3098 }
3099 /* consume cover traffic */
3100 if (pr->anonymity_level > 1)
3101 cover_query_count -= pr->anonymity_level - 1;
3102
3103 /* (1) select target */
3104 psc.pr = pr;
3105 psc.target_score = -DBL_MAX;
3106 psc.fast_retry = GNUNET_NO;
3107 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3108 &target_peer_select_cb,
3109 &psc);
3110 if (psc.target_score == -DBL_MAX)
3111 {
3112 if (psc.fast_retry == GNUNET_YES)
3113 delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3114 else
3115 delay = get_processing_delay ();
3116#if DEBUG_FS
3117 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3118 "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3119 GNUNET_h2s (&pr->query),
3120 delay.rel_value);
3121#endif
3122 pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3123 &forward_request_task,
3124 pr);
3125 return; /* nobody selected */
3126 }
3127 /* (3) update TTL/priority */
3128 if (pr->client_request_list != NULL)
3129 {
3130 /* FIXME: use better algorithm!? */
3131 if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3132 4))
3133 pr->priority++;
3134 /* bound priority we use by priorities we see from other peers
3135 rounded up (must round up so that we can see non-zero
3136 priorities, but round up as little as possible to make it
3137 plausible that we forwarded another peers request) */
3138 if (pr->priority > current_priorities + 1.0)
3139 pr->priority = (uint32_t) current_priorities + 1.0;
3140 pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3141 pr->priority);
3142#if DEBUG_FS
3143 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3144 "Trying query `%s' with priority %u and TTL %d.\n",
3145 GNUNET_h2s (&pr->query),
3146 pr->priority,
3147 pr->ttl);
3148#endif
3149 }
3150
3151 /* (3) reserve reply bandwidth */
3152 if (GNUNET_NO == pr->forward_only)
3153 {
3154 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3155 &psc.target.hashPubKey);
3156 GNUNET_assert (NULL != cp);
3157 GNUNET_assert (cp->irc == NULL);
3158 pr->pirc = cp;
3159 cp->pr = pr;
3160#if DEBUG_FS
3161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3162 "Asking core for bandwidth for query `%s'.\n",
3163 GNUNET_h2s (&pr->query));
3164#endif
3165 cp->irc = GNUNET_CORE_peer_change_preference (core,
3166 &psc.target,
3167 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3168 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3169 DBLOCK_SIZE * 2,
3170 cp->inc_preference,
3171 &target_reservation_cb,
3172 pr);
3173 GNUNET_assert (cp->irc != NULL);
3174 cp->inc_preference = 0;
3175 }
3176 else
3177 {
3178 /* force forwarding */
3179 static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3180 target_reservation_cb (pr, &psc.target,
3181 zerobw, 0,
3182 GNUNET_TIME_UNIT_FOREVER_REL,
3183 0.0);
3184 }
3185}
3186
3187
3188/* **************************** P2P PUT Handling ************************ */
3189
3190
3191/**
3192 * Function called after we either failed or succeeded
3193 * at transmitting a reply to a peer.
3194 *
3195 * @param cls the requests "struct PendingRequest*"
3196 * @param tpid ID of receiving peer, 0 on transmission error
3197 */
3198static void
3199transmit_reply_continuation (void *cls,
3200 GNUNET_PEER_Id tpid)
3201{
3202 struct PendingRequest *pr = cls;
3203
3204 switch (pr->type)
3205 {
3206 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3207 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3208 /* only one reply expected, done with the request! */
3209 destroy_pending_request (pr);
3210 break;
3211 case GNUNET_BLOCK_TYPE_ANY:
3212 case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3213 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3214 break;
3215 default:
3216 GNUNET_break (0);
3217 break;
3218 }
3219}
3220
3221
3222/**
3223 * Transmit the given message by copying it to the target buffer
3224 * "buf". "buf" will be NULL and "size" zero if the socket was closed
3225 * for writing in the meantime. In that case, do nothing
3226 * (the disconnect or shutdown handler will take care of the rest).
3227 * If we were able to transmit messages and there are still more
3228 * pending, ask core again for further calls to this function.
3229 *
3230 * @param cls closure, pointer to the 'struct ClientList*'
3231 * @param size number of bytes available in buf
3232 * @param buf where the callee should write the message
3233 * @return number of bytes written to buf
3234 */
3235static size_t
3236transmit_to_client (void *cls,
3237 size_t size, void *buf)
3238{
3239 struct ClientList *cl = cls;
3240 char *cbuf = buf;
3241 struct ClientResponseMessage *creply;
3242 size_t msize;
3243
3244 cl->th = NULL;
3245 if (NULL == buf)
3246 {
3247#if DEBUG_FS
3248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3249 "Not sending reply, client communication problem.\n");
3250#endif
3251 return 0;
3252 }
3253 msize = 0;
3254 while ( (NULL != (creply = cl->res_head) ) &&
3255 (creply->msize <= size) )
3256 {
3257 memcpy (&cbuf[msize], &creply[1], creply->msize);
3258 msize += creply->msize;
3259 size -= creply->msize;
3260 GNUNET_CONTAINER_DLL_remove (cl->res_head,
3261 cl->res_tail,
3262 creply);
3263 GNUNET_free (creply);
3264 }
3265 if (NULL != creply)
3266 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3267 creply->msize,
3268 GNUNET_TIME_UNIT_FOREVER_REL,
3269 &transmit_to_client,
3270 cl);
3271#if DEBUG_FS
3272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3273 "Transmitted %u bytes to client\n",
3274 (unsigned int) msize);
3275#endif
3276 return msize;
3277}
3278
3279
3280/**
3281 * Closure for "process_reply" function.
3282 */
3283struct ProcessReplyClosure
3284{
3285 /**
3286 * The data for the reply.
3287 */
3288 const void *data;
3289
3290 /**
3291 * Who gave us this reply? NULL for local host (or DHT)
3292 */
3293 struct ConnectedPeer *sender;
3294
3295 /**
3296 * When the reply expires.
3297 */
3298 struct GNUNET_TIME_Absolute expiration;
3299
3300 /**
3301 * Size of data.
3302 */
3303 size_t size;
3304
3305 /**
3306 * Type of the block.
3307 */
3308 enum GNUNET_BLOCK_Type type;
3309
3310 /**
3311 * How much was this reply worth to us?
3312 */
3313 uint32_t priority;
3314
3315 /**
3316 * Anonymity requirements for this reply.
3317 */
3318 uint32_t anonymity_level;
3319
3320 /**
3321 * Evaluation result (returned).
3322 */
3323 enum GNUNET_BLOCK_EvaluationResult eval;
3324
3325 /**
3326 * Did we finish processing the associated request?
3327 */
3328 int finished;
3329
3330 /**
3331 * Did we find a matching request?
3332 */
3333 int request_found;
3334};
3335
3336
3337/** 213/**
3338 * We have received a reply; handle it! 214 * Handle P2P "PUT" message.
3339 * 215 *
3340 * @param cls response (struct ProcessReplyClosure) 216 * @param cls closure, always NULL
3341 * @param key our query 217 * @param other the other peer involved (sender or receiver, NULL
3342 * @param value value in the hash map (info about the query) 218 * for loopback messages where we are both sender and receiver)
3343 * @return GNUNET_YES (we should continue to iterate) 219 * @param message the actual message
220 * @param atsi performance information
221 * @return GNUNET_OK to keep the connection open,
222 * GNUNET_SYSERR to close it (signal serious error)
3344 */ 223 */
3345static int 224static int
3346process_reply (void *cls, 225handle_p2p_put (void *cls,
3347 const GNUNET_HashCode * key, 226 const struct GNUNET_PeerIdentity *other,
3348 void *value) 227 const struct GNUNET_MessageHeader *message,
228 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3349{ 229{
3350 struct ProcessReplyClosure *prq = cls; 230 struct GSF_ConnectedPeer *cp;
3351 struct PendingRequest *pr = value;
3352 struct PendingMessage *reply;
3353 struct ClientResponseMessage *creply;
3354 struct ClientList *cl;
3355 struct PutMessage *pm;
3356 struct ConnectedPeer *cp;
3357 struct GNUNET_TIME_Relative cur_delay;
3358#if SUPPORT_DELAYS
3359struct GNUNET_TIME_Relative art_delay;
3360#endif
3361 size_t msize;
3362 unsigned int i;
3363 231
3364 if (NULL == pr->client_request_list) 232 cp = GSF_peer_get_ (other);
3365 { 233 if (NULL == cp)
3366 /* reply will go over the network, check for cover traffic */
3367 if ( (prq->anonymity_level > 1) &&
3368 (cover_content_count < prq->anonymity_level - 1) )
3369 {
3370 /* insufficient cover traffic, skip */
3371 GNUNET_STATISTICS_update (stats,
3372 gettext_noop ("# replies suppressed due to lack of cover traffic"),
3373 1,
3374 GNUNET_NO);
3375 return GNUNET_YES;
3376 }
3377 if (prq->anonymity_level > 1)
3378 cover_content_count -= prq->anonymity_level - 1;
3379 }
3380#if DEBUG_FS
3381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3382 "Matched result (type %u) for query `%s' with pending request\n",
3383 (unsigned int) prq->type,
3384 GNUNET_h2s (key));
3385#endif
3386 GNUNET_STATISTICS_update (stats,
3387 gettext_noop ("# replies received and matched"),
3388 1,
3389 GNUNET_NO);
3390 if (prq->sender != NULL)
3391 { 234 {
3392 for (i=0;i<pr->used_targets_off;i++)
3393 if (pr->used_targets[i].pid == prq->sender->pid)
3394 break;
3395 if (i < pr->used_targets_off)
3396 {
3397 cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
3398 prq->sender->avg_delay.rel_value
3399 = (prq->sender->avg_delay.rel_value *
3400 (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
3401 prq->sender->avg_priority
3402 = (prq->sender->avg_priority *
3403 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3404 }
3405 if (pr->cp != NULL)
3406 {
3407 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3408 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
3409 -1);
3410 GNUNET_PEER_change_rc (pr->cp->pid, 1);
3411 prq->sender->last_p2p_replies
3412 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3413 = pr->cp->pid;
3414 }
3415 else
3416 {
3417 if (NULL != prq->sender->last_client_replies
3418 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3419 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3420 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3421 prq->sender->last_client_replies
3422 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3423 = pr->client_request_list->client_list->client;
3424 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3425 }
3426 }
3427 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3428 prq->type,
3429 key,
3430 &pr->bf,
3431 pr->mingle,
3432 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3433 prq->data,
3434 prq->size);
3435 switch (prq->eval)
3436 {
3437 case GNUNET_BLOCK_EVALUATION_OK_MORE:
3438 break;
3439 case GNUNET_BLOCK_EVALUATION_OK_LAST:
3440 while (NULL != pr->pending_head)
3441 destroy_pending_message_list_entry (pr->pending_head);
3442 if (pr->qe != NULL)
3443 {
3444 if (pr->client_request_list != NULL)
3445 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3446 GNUNET_YES);
3447 GNUNET_DATASTORE_cancel (pr->qe);
3448 pr->qe = NULL;
3449 }
3450 pr->do_remove = GNUNET_YES;
3451 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3452 {
3453 GNUNET_SCHEDULER_cancel (pr->task);
3454 pr->task = GNUNET_SCHEDULER_NO_TASK;
3455 }
3456 GNUNET_break (GNUNET_YES ==
3457 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3458 key,
3459 pr));
3460 GNUNET_LOAD_update (rt_entry_lifetime,
3461 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3462 break;
3463 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3464 GNUNET_STATISTICS_update (stats,
3465 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3466 1,
3467 GNUNET_NO);
3468#if DEBUG_FS
3469/* GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3470 "Duplicate response `%s', discarding.\n",
3471 GNUNET_h2s (&mhash));*/
3472#endif
3473 return GNUNET_YES; /* duplicate */
3474 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3475 return GNUNET_YES; /* wrong namespace */
3476 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3477 GNUNET_break (0);
3478 return GNUNET_YES;
3479 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3480 GNUNET_break (0); 235 GNUNET_break (0);
3481 return GNUNET_YES; 236 return GNUNET_OK;
3482 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3483 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3484 _("Unsupported block type %u\n"),
3485 prq->type);
3486 return GNUNET_NO;
3487 }
3488 if (pr->client_request_list != NULL)
3489 {
3490 if (pr->replies_seen_size == pr->replies_seen_off)
3491 GNUNET_array_grow (pr->replies_seen,
3492 pr->replies_seen_size,
3493 pr->replies_seen_size * 2 + 4);
3494 GNUNET_CRYPTO_hash (prq->data,
3495 prq->size,
3496 &pr->replies_seen[pr->replies_seen_off++]);
3497 refresh_bloomfilter (pr);
3498 }
3499 if (NULL == prq->sender)
3500 {
3501#if DEBUG_FS
3502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3503 "Found result for query `%s' in local datastore\n",
3504 GNUNET_h2s (key));
3505#endif
3506 GNUNET_STATISTICS_update (stats,
3507 gettext_noop ("# results found locally"),
3508 1,
3509 GNUNET_NO);
3510 }
3511 prq->priority += pr->remaining_priority;
3512 pr->remaining_priority = 0;
3513 pr->results_found++;
3514 prq->request_found = GNUNET_YES;
3515 if (NULL != pr->client_request_list)
3516 {
3517 GNUNET_STATISTICS_update (stats,
3518 gettext_noop ("# replies received for local clients"),
3519 1,
3520 GNUNET_NO);
3521 cl = pr->client_request_list->client_list;
3522 msize = sizeof (struct PutMessage) + prq->size;
3523 creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3524 creply->msize = msize;
3525 creply->client_list = cl;
3526 GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3527 cl->res_tail,
3528 cl->res_tail,
3529 creply);
3530 pm = (struct PutMessage*) &creply[1];
3531 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3532 pm->header.size = htons (msize);
3533 pm->type = htonl (prq->type);
3534 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3535 memcpy (&pm[1], prq->data, prq->size);
3536 if (NULL == cl->th)
3537 {
3538#if DEBUG_FS
3539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3540 "Transmitting result for query `%s' to client\n",
3541 GNUNET_h2s (key));
3542#endif
3543 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3544 msize,
3545 GNUNET_TIME_UNIT_FOREVER_REL,
3546 &transmit_to_client,
3547 cl);
3548 }
3549 GNUNET_break (cl->th != NULL);
3550 if (pr->do_remove)
3551 {
3552 prq->finished = GNUNET_YES;
3553 destroy_pending_request (pr);
3554 }
3555 }
3556 else
3557 {
3558 cp = pr->cp;
3559#if DEBUG_FS
3560 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3561 "Transmitting result for query `%s' to other peer (PID=%u)\n",
3562 GNUNET_h2s (key),
3563 (unsigned int) cp->pid);
3564#endif
3565 GNUNET_STATISTICS_update (stats,
3566 gettext_noop ("# replies received for other peers"),
3567 1,
3568 GNUNET_NO);
3569 msize = sizeof (struct PutMessage) + prq->size;
3570 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3571 reply->cont = &transmit_reply_continuation;
3572 reply->cont_cls = pr;
3573#if SUPPORT_DELAYS
3574 art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3575 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3576 TTL_DECREMENT));
3577 reply->delay_until
3578 = GNUNET_TIME_relative_to_absolute (art_delay);
3579 GNUNET_STATISTICS_update (stats,
3580 gettext_noop ("cummulative artificial delay introduced (ms)"),
3581 art_delay.abs_value,
3582 GNUNET_NO);
3583#endif
3584 reply->msize = msize;
3585 reply->priority = UINT32_MAX; /* send replies first! */
3586 pm = (struct PutMessage*) &reply[1];
3587 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3588 pm->header.size = htons (msize);
3589 pm->type = htonl (prq->type);
3590 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3591 memcpy (&pm[1], prq->data, prq->size);
3592 add_to_pending_messages_for_peer (cp, reply, pr);
3593 } 237 }
3594 return GNUNET_YES; 238 return GSF_handle_p2p_content_ (cp, message);
3595} 239}
3596 240
3597 241
3598/** 242/**
3599 * Iterator called on each result obtained for a DHT 243 * We have a new request, consider forwarding it to the given
3600 * operation that expects a reply 244 * peer.
3601 * 245 *
3602 * @param cls closure 246 * @param cls the 'struct GSF_PendingRequest'
3603 * @param exp when will this value expire 247 * @param peer identity of the peer
3604 * @param key key of the result 248 * @param cp handle to the connected peer record
3605 * @param get_path NULL-terminated array of pointers 249 * @param ppd peer performance data
3606 * to the peers on reverse GET path (or NULL if not recorded)
3607 * @param put_path NULL-terminated array of pointers
3608 * to the peers on the PUT path (or NULL if not recorded)
3609 * @param type type of the result
3610 * @param size number of bytes in data
3611 * @param data pointer to the result data
3612 */ 250 */
3613static void 251static void
3614process_dht_reply (void *cls, 252consider_request_for_forwarding (void *cls,
3615 struct GNUNET_TIME_Absolute exp, 253 const struct GNUNET_PeerIdentity *peer,
3616 const GNUNET_HashCode * key, 254 struct GSF_ConnectedPeer *cp,
3617 const struct GNUNET_PeerIdentity * const *get_path, 255 const struct GSF_PeerPerformanceData *ppd)
3618 const struct GNUNET_PeerIdentity * const *put_path,
3619 enum GNUNET_BLOCK_Type type,
3620 size_t size,
3621 const void *data)
3622{ 256{
3623 struct PendingRequest *pr = cls; 257 struct GSF_PendingRequest *pr = cls;
3624 struct ProcessReplyClosure prq;
3625 258
3626 memset (&prq, 0, sizeof (prq)); 259 GSF_plan_add_ (cp, pr);
3627 prq.data = data;
3628 prq.expiration = exp;
3629 prq.size = size;
3630 prq.type = type;
3631 process_reply (&prq, key, pr);
3632} 260}
3633 261
3634 262
3635
3636/** 263/**
3637 * Continuation called to notify client about result of the 264 * Function to be called after we're done processing
3638 * operation. 265 * replies from the local lookup. If the result status
266 * code indicates that there may be more replies, plan
267 * forwarding the request.
3639 * 268 *
3640 * @param cls closure 269 * @param cls closure (NULL)
3641 * @param success GNUNET_SYSERR on failure 270 * @param pr the pending request we were processing
3642 * @param msg NULL on success, otherwise an error message 271 * @param result final datastore lookup result
3643 */ 272 */
3644static void 273static void
3645put_migration_continuation (void *cls, 274consider_forwarding (void *cls,
3646 int success, 275 struct GSF_PendingRequest *pr,
3647 const char *msg) 276 enum GNUNET_BLOCK_EvaluationResult result)
3648{ 277{
3649 struct GNUNET_TIME_Absolute *start = cls; 278 if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
3650 struct GNUNET_TIME_Relative delay; 279 return; /* we're done... */
3651 280 GSF_iterate_connected_peers_ (&consider_request_for_forwarding,
3652 delay = GNUNET_TIME_absolute_get_duration (*start); 281 pr);
3653 GNUNET_free (start);
3654 GNUNET_LOAD_update (datastore_put_load,
3655 delay.rel_value);
3656 if (GNUNET_OK == success)
3657 return;
3658 GNUNET_STATISTICS_update (stats,
3659 gettext_noop ("# datastore 'put' failures"),
3660 1,
3661 GNUNET_NO);
3662} 282}
3663 283
3664 284
3665/** 285/**
3666 * Handle P2P "PUT" message. 286 * Handle P2P "GET" request.
3667 * 287 *
3668 * @param cls closure, always NULL 288 * @param cls closure, always NULL
3669 * @param other the other peer involved (sender or receiver, NULL 289 * @param other the other peer involved (sender or receiver, NULL
@@ -3674,963 +294,176 @@ put_migration_continuation (void *cls,
3674 * GNUNET_SYSERR to close it (signal serious error) 294 * GNUNET_SYSERR to close it (signal serious error)
3675 */ 295 */
3676static int 296static int
3677handle_p2p_put (void *cls, 297handle_p2p_get (void *cls,
3678 const struct GNUNET_PeerIdentity *other, 298 const struct GNUNET_PeerIdentity *other,
3679 const struct GNUNET_MessageHeader *message, 299 const struct GNUNET_MessageHeader *message,
3680 const struct GNUNET_TRANSPORT_ATS_Information *atsi) 300 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3681{ 301{
3682 const struct PutMessage *put; 302 struct GSF_PendingRequest *pr;
3683 uint16_t msize;
3684 size_t dsize;
3685 enum GNUNET_BLOCK_Type type;
3686 struct GNUNET_TIME_Absolute expiration;
3687 GNUNET_HashCode query;
3688 struct ProcessReplyClosure prq;
3689 struct GNUNET_TIME_Absolute *start;
3690 struct GNUNET_TIME_Relative block_time;
3691 double putl;
3692 struct ConnectedPeer *cp;
3693 struct PendingMessage *pm;
3694 struct MigrationStopMessage *msm;
3695
3696 msize = ntohs (message->size);
3697 if (msize < sizeof (struct PutMessage))
3698 {
3699 GNUNET_break_op(0);
3700 return GNUNET_SYSERR;
3701 }
3702 put = (const struct PutMessage*) message;
3703 dsize = msize - sizeof (struct PutMessage);
3704 type = ntohl (put->type);
3705 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3706 303
3707 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 304 pr = GSF_handle_p2p_query_ (other, message);
305 if (NULL == pr)
3708 return GNUNET_SYSERR; 306 return GNUNET_SYSERR;
3709 if (GNUNET_OK != 307 GSF_local_lookup_ (pr,
3710 GNUNET_BLOCK_get_key (block_ctx, 308 &consider_forwarding,
3711 type, 309 NULL);
3712 &put[1],
3713 dsize,
3714 &query))
3715 {
3716 GNUNET_break_op (0);
3717 return GNUNET_SYSERR;
3718 }
3719 cover_content_count++;
3720#if DEBUG_FS
3721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3722 "Received result for query `%s' from peer `%4s'\n",
3723 GNUNET_h2s (&query),
3724 GNUNET_i2s (other));
3725#endif
3726 GNUNET_STATISTICS_update (stats,
3727 gettext_noop ("# replies received (overall)"),
3728 1,
3729 GNUNET_NO);
3730 /* now, lookup 'query' */
3731 prq.data = (const void*) &put[1];
3732 if (other != NULL)
3733 prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3734 &other->hashPubKey);
3735 else
3736 prq.sender = NULL;
3737 prq.size = dsize;
3738 prq.type = type;
3739 prq.expiration = expiration;
3740 prq.priority = 0;
3741 prq.anonymity_level = 1;
3742 prq.finished = GNUNET_NO;
3743 prq.request_found = GNUNET_NO;
3744 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3745 &query,
3746 &process_reply,
3747 &prq);
3748 if (prq.sender != NULL)
3749 {
3750 prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3751 change_host_trust (prq.sender, prq.priority);
3752 }
3753 if ( (GNUNET_YES == active_to_migration) &&
3754 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3755 {
3756#if DEBUG_FS
3757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3758 "Replicating result for query `%s' with priority %u\n",
3759 GNUNET_h2s (&query),
3760 prq.priority);
3761#endif
3762 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3763 *start = GNUNET_TIME_absolute_get ();
3764 GNUNET_DATASTORE_put (dsh,
3765 0, &query, dsize, &put[1],
3766 type, prq.priority, 1 /* anonymity */,
3767 0 /* replication */,
3768 expiration,
3769 1 + prq.priority,
3770 MAX_DATASTORE_QUEUE,
3771 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3772 &put_migration_continuation,
3773 start);
3774 }
3775 putl = GNUNET_LOAD_get_load (datastore_put_load);
3776 if ( (NULL != (cp = prq.sender)) &&
3777 (GNUNET_NO == prq.request_found) &&
3778 ( (GNUNET_YES != active_to_migration) ||
3779 (putl > 2.5 * (1 + prq.priority)) ) )
3780 {
3781 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3782 return GNUNET_OK; /* already blocked */
3783 /* We're too busy; send MigrationStop message! */
3784 if (GNUNET_YES != active_to_migration)
3785 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3786 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3787 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3788 (unsigned int) (60000 * putl * putl)));
3789
3790 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3791 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
3792 sizeof (struct MigrationStopMessage));
3793 pm->msize = sizeof (struct MigrationStopMessage);
3794 pm->priority = UINT32_MAX;
3795 msm = (struct MigrationStopMessage*) &pm[1];
3796 msm->header.size = htons (sizeof (struct MigrationStopMessage));
3797 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3798 msm->duration = GNUNET_TIME_relative_hton (block_time);
3799 add_to_pending_messages_for_peer (cp,
3800 pm,
3801 NULL);
3802 }
3803 return GNUNET_OK;
3804}
3805
3806
3807/**
3808 * Handle P2P "MIGRATION_STOP" message.
3809 *
3810 * @param cls closure, always NULL
3811 * @param other the other peer involved (sender or receiver, NULL
3812 * for loopback messages where we are both sender and receiver)
3813 * @param message the actual message
3814 * @param atsi performance information
3815 * @return GNUNET_OK to keep the connection open,
3816 * GNUNET_SYSERR to close it (signal serious error)
3817 */
3818static int
3819handle_p2p_migration_stop (void *cls,
3820 const struct GNUNET_PeerIdentity *other,
3821 const struct GNUNET_MessageHeader *message,
3822 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3823{
3824 struct ConnectedPeer *cp;
3825 const struct MigrationStopMessage *msm;
3826
3827 msm = (const struct MigrationStopMessage*) message;
3828 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3829 &other->hashPubKey);
3830 if (cp == NULL)
3831 {
3832 GNUNET_break (0);
3833 return GNUNET_OK;
3834 }
3835 cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3836 return GNUNET_OK; 310 return GNUNET_OK;
3837} 311}
3838 312
3839 313
3840
3841/* **************************** P2P GET Handling ************************ */
3842
3843
3844/**
3845 * Closure for 'check_duplicate_request_{peer,client}'.
3846 */
3847struct CheckDuplicateRequestClosure
3848{
3849 /**
3850 * The new request we should check if it already exists.
3851 */
3852 const struct PendingRequest *pr;
3853
3854 /**
3855 * Existing request found by the checker, NULL if none.
3856 */
3857 struct PendingRequest *have;
3858};
3859
3860
3861/**
3862 * Iterator over entries in the 'query_request_map' that
3863 * tries to see if we have the same request pending from
3864 * the same client already.
3865 *
3866 * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3867 * @param key current key code (query, ignored, must match)
3868 * @param value value in the hash map (a 'struct PendingRequest'
3869 * that already exists)
3870 * @return GNUNET_YES if we should continue to
3871 * iterate (no match yet)
3872 * GNUNET_NO if not (match found).
3873 */
3874static int
3875check_duplicate_request_client (void *cls,
3876 const GNUNET_HashCode * key,
3877 void *value)
3878{
3879 struct CheckDuplicateRequestClosure *cdc = cls;
3880 struct PendingRequest *have = value;
3881
3882 if (have->client_request_list == NULL)
3883 return GNUNET_YES;
3884 if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3885 (cdc->pr != have) )
3886 {
3887 cdc->have = have;
3888 return GNUNET_NO;
3889 }
3890 return GNUNET_YES;
3891}
3892
3893
3894/** 314/**
3895 * We're processing (local) results for a search request 315 * We're done with the local lookup, now consider
3896 * from another peer. Pass applicable results to the 316 * P2P processing (depending on request options and
3897 * peer and if we are done either clean up (operation 317 * result status). Also signal that we can now
3898 * complete) or forward to other peers (more results possible). 318 * receive more request information from the client.
3899 * 319 *
3900 * @param cls our closure (struct LocalGetContext) 320 * @param cls the client doing the request ('struct GNUNET_SERVER_Client')
3901 * @param key key for the content 321 * @param pr the pending request we were processing
3902 * @param size number of bytes in data 322 * @param result final datastore lookup result
3903 * @param data content stored
3904 * @param type type of the content
3905 * @param priority priority of the content
3906 * @param anonymity anonymity-level for the content
3907 * @param expiration expiration time for the content
3908 * @param uid unique identifier for the datum;
3909 * maybe 0 if no unique identifier is available
3910 */ 323 */
3911static void 324static void
3912process_local_reply (void *cls, 325start_p2p_processing (void *cls,
3913 const GNUNET_HashCode * key, 326 struct GSF_PendingRequest *pr,
3914 size_t size, 327 enum GNUNET_BLOCK_EvaluationResult result)
3915 const void *data,
3916 enum GNUNET_BLOCK_Type type,
3917 uint32_t priority,
3918 uint32_t anonymity,
3919 struct GNUNET_TIME_Absolute
3920 expiration,
3921 uint64_t uid)
3922{ 328{
3923 struct PendingRequest *pr = cls; 329 struct GNUNET_SERVER_Client *client = cls;
3924 struct ProcessReplyClosure prq; 330 struct GSF_PendingRequestData *prd;
3925 struct CheckDuplicateRequestClosure cdrc; 331
3926 GNUNET_HashCode query; 332 prd = GSF_pending_request_get_data_ (pr);
3927 unsigned int old_rf;
3928
3929 if (NULL == key)
3930 {
3931#if DEBUG_FS > 1
3932 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3933 "Done processing local replies, forwarding request to other peers.\n");
3934#endif
3935 pr->qe = NULL;
3936 if (pr->client_request_list != NULL)
3937 {
3938 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
3939 GNUNET_YES);
3940 /* Figure out if this is a duplicate request and possibly
3941 merge 'struct PendingRequest' entries */
3942 cdrc.have = NULL;
3943 cdrc.pr = pr;
3944 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3945 &pr->query,
3946 &check_duplicate_request_client,
3947 &cdrc);
3948 if (cdrc.have != NULL)
3949 {
3950#if DEBUG_FS
3951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3952 "Received request for block `%s' twice from client, will only request once.\n",
3953 GNUNET_h2s (&pr->query));
3954#endif
3955
3956 destroy_pending_request (pr);
3957 return;
3958 }
3959 }
3960 if (pr->local_only == GNUNET_YES)
3961 {
3962 destroy_pending_request (pr);
3963 return;
3964 }
3965 /* no more results */
3966 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3967 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3968 pr);
3969 return;
3970 }
3971#if DEBUG_FS 333#if DEBUG_FS
3972 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 334 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3973 "New local response to `%s' of type %u.\n", 335 "Finished database lookup for local request `%s' with result %d\n",
3974 GNUNET_h2s (key), 336 GNUNET_h2s (&prd->query),
3975 type); 337 result);
3976#endif
3977 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3978 {
3979#if DEBUG_FS
3980 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3981 "Found ONDEMAND block, performing on-demand encoding\n");
3982#endif 338#endif
3983 GNUNET_STATISTICS_update (stats, 339 GNUNET_SERVER_receive_done (client,
3984 gettext_noop ("# on-demand blocks matched requests"), 340 GNUNET_OK);
3985 1, 341 if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
3986 GNUNET_NO); 342 return; /* we're done, 'pr' was already destroyed... */
3987 if (GNUNET_OK != 343 if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) )
3988 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
3989 anonymity, expiration, uid,
3990 &process_local_reply,
3991 pr))
3992 if (pr->qe != NULL)
3993 {
3994 GNUNET_DATASTORE_iterate_get_next (dsh);
3995 }
3996 return;
3997 }
3998 old_rf = pr->results_found;
3999 memset (&prq, 0, sizeof (prq));
4000 prq.data = data;
4001 prq.expiration = expiration;
4002 prq.size = size;
4003 if (GNUNET_OK !=
4004 GNUNET_BLOCK_get_key (block_ctx,
4005 type,
4006 data,
4007 size,
4008 &query))
4009 { 344 {
4010 GNUNET_break (0); 345 GSF_pending_request_cancel_ (pr);
4011 GNUNET_DATASTORE_remove (dsh,
4012 key,
4013 size, data,
4014 -1, -1,
4015 GNUNET_TIME_UNIT_FOREVER_REL,
4016 NULL, NULL);
4017 GNUNET_DATASTORE_iterate_get_next (dsh);
4018 return; 346 return;
4019 } 347 }
4020 prq.type = type; 348 GSF_dht_lookup_ (pr);
4021 prq.priority = priority; 349 consider_forwarding (NULL, pr, result);
4022 prq.finished = GNUNET_NO;
4023 prq.request_found = GNUNET_NO;
4024 prq.anonymity_level = anonymity;
4025 if ( (old_rf == 0) &&
4026 (pr->results_found == 0) )
4027 update_datastore_delays (pr->start_time);
4028 process_reply (&prq, key, pr);
4029 if (prq.finished == GNUNET_YES)
4030 return;
4031 if (pr->qe == NULL)
4032 return; /* done here */
4033 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
4034 {
4035 pr->local_only = GNUNET_YES; /* do not forward */
4036 GNUNET_DATASTORE_iterate_get_next (dsh);
4037 return;
4038 }
4039 if ( (pr->client_request_list == NULL) &&
4040 ( (GNUNET_YES == test_get_load_too_high (0)) ||
4041 (pr->results_found > 5 + 2 * pr->priority) ) )
4042 {
4043#if DEBUG_FS > 2
4044 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4045 "Load too high, done with request\n");
4046#endif
4047 GNUNET_STATISTICS_update (stats,
4048 gettext_noop ("# processing result set cut short due to load"),
4049 1,
4050 GNUNET_NO);
4051 GNUNET_DATASTORE_iterate_get_next (dsh);
4052 return;
4053 }
4054 GNUNET_DATASTORE_iterate_get_next (dsh);
4055} 350}
4056 351
4057 352
4058/** 353/**
4059 * We've received a request with the specified priority. Bound it 354 * Handle START_SEARCH-message (search request from client).
4060 * according to how much we trust the given peer. 355 *
4061 * 356 * @param cls closure
4062 * @param prio_in requested priority 357 * @param client identification of the client
4063 * @param cp the peer making the request 358 * @param message the actual message
4064 * @return effective priority
4065 */ 359 */
4066static int32_t 360static void
4067bound_priority (uint32_t prio_in, 361handle_start_search (void *cls,
4068 struct ConnectedPeer *cp) 362 struct GNUNET_SERVER_Client *client,
363 const struct GNUNET_MessageHeader *message)
4069{ 364{
4070#define N ((double)128.0) 365 struct GSF_PendingRequest *pr;
4071 uint32_t ret;
4072 double rret;
4073 int ld;
4074 366
4075 ld = test_get_load_too_high (0); 367 pr = GSF_local_client_start_search_handler_ (client, message);
4076 if (ld == GNUNET_SYSERR) 368 if (NULL == pr)
4077 {
4078 GNUNET_STATISTICS_update (stats,
4079 gettext_noop ("# requests done for free (low load)"),
4080 1,
4081 GNUNET_NO);
4082 return 0; /* excess resources */
4083 }
4084 if (prio_in > INT32_MAX)
4085 prio_in = INT32_MAX;
4086 ret = - change_host_trust (cp, - (int) prio_in);
4087 if (ret > 0)
4088 {
4089 if (ret > current_priorities + N)
4090 rret = current_priorities + N;
4091 else
4092 rret = ret;
4093 current_priorities
4094 = (current_priorities * (N-1) + rret)/N;
4095 }
4096 if ( (ld == GNUNET_YES) && (ret > 0) )
4097 { 369 {
4098 /* try with charging */ 370 /* 'GNUNET_SERVER_receive_done was already called! */
4099 ld = test_get_load_too_high (ret); 371 return;
4100 }
4101 if (ld == GNUNET_YES)
4102 {
4103 GNUNET_STATISTICS_update (stats,
4104 gettext_noop ("# request dropped, priority insufficient"),
4105 1,
4106 GNUNET_NO);
4107 /* undo charge */
4108 change_host_trust (cp, (int) ret);
4109 return -1; /* not enough resources */
4110 }
4111 else
4112 {
4113 GNUNET_STATISTICS_update (stats,
4114 gettext_noop ("# requests done for a price (normal load)"),
4115 1,
4116 GNUNET_NO);
4117 } 372 }
4118#undef N 373 GSF_local_lookup_ (pr,
4119 return ret; 374 &start_p2p_processing,
375 client);
4120} 376}
4121 377
4122 378
4123/** 379/**
4124 * Iterator over entries in the 'query_request_map' that 380 * Task run during shutdown.
4125 * tries to see if we have the same request pending from
4126 * the same peer already.
4127 * 381 *
4128 * @param cls closure (our 'struct CheckDuplicateRequestClosure') 382 * @param cls unused
4129 * @param key current key code (query, ignored, must match) 383 * @param tc unused
4130 * @param value value in the hash map (a 'struct PendingRequest'
4131 * that already exists)
4132 * @return GNUNET_YES if we should continue to
4133 * iterate (no match yet)
4134 * GNUNET_NO if not (match found).
4135 */ 384 */
4136static int 385static void
4137check_duplicate_request_peer (void *cls, 386shutdown_task (void *cls,
4138 const GNUNET_HashCode * key, 387 const struct GNUNET_SCHEDULER_TaskContext *tc)
4139 void *value)
4140{ 388{
4141 struct CheckDuplicateRequestClosure *cdc = cls; 389 if (NULL != GSF_core)
4142 struct PendingRequest *have = value; 390 {
4143 391 GNUNET_CORE_disconnect (GSF_core);
4144 if (cdc->pr->target_pid == have->target_pid) 392 GSF_core = NULL;
393 }
394 GSF_put_done_ ();
395 GSF_push_done_ ();
396 GSF_pending_request_done_ ();
397 GSF_plan_done ();
398 GSF_connected_peer_done_ ();
399 GNUNET_DATASTORE_disconnect (GSF_dsh, GNUNET_NO);
400 GSF_dsh = NULL;
401 GNUNET_DHT_disconnect (GSF_dht);
402 GSF_dht = NULL;
403 GNUNET_BLOCK_context_destroy (GSF_block_ctx);
404 GSF_block_ctx = NULL;
405 GNUNET_CONFIGURATION_destroy (block_cfg);
406 block_cfg = NULL;
407 GNUNET_STATISTICS_destroy (GSF_stats, GNUNET_NO);
408 if (GNUNET_SCHEDULER_NO_TASK != cover_age_task)
4145 { 409 {
4146 cdc->have = have; 410 GNUNET_SCHEDULER_cancel (cover_age_task);
4147 return GNUNET_NO; 411 cover_age_task = GNUNET_SCHEDULER_NO_TASK;
4148 } 412 }
4149 return GNUNET_YES; 413 GNUNET_FS_indexing_done ();
414 GNUNET_LOAD_value_free (datastore_get_load);
415 datastore_get_load = NULL;
416 GNUNET_LOAD_value_free (GSF_rt_entry_lifetime);
417 GSF_rt_entry_lifetime = NULL;
4150} 418}
4151 419
4152 420
4153/** 421/**
4154 * Handle P2P "GET" request. 422 * Function called for each pending request whenever a new
423 * peer connects, giving us a chance to decide about submitting
424 * the existing request to the new peer.
4155 * 425 *
4156 * @param cls closure, always NULL 426 * @param cls the 'struct GSF_ConnectedPeer' of the new peer
4157 * @param other the other peer involved (sender or receiver, NULL 427 * @param key query for the request
4158 * for loopback messages where we are both sender and receiver) 428 * @param pr handle to the pending request
4159 * @param message the actual message 429 * @return GNUNET_YES to continue to iterate
4160 * @param atsi performance information
4161 * @return GNUNET_OK to keep the connection open,
4162 * GNUNET_SYSERR to close it (signal serious error)
4163 */ 430 */
4164static int 431static int
4165handle_p2p_get (void *cls, 432consider_peer_for_forwarding (void *cls,
4166 const struct GNUNET_PeerIdentity *other, 433 const GNUNET_HashCode *key,
4167 const struct GNUNET_MessageHeader *message, 434 struct GSF_PendingRequest *pr)
4168 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4169{ 435{
4170 struct PendingRequest *pr; 436 struct GSF_ConnectedPeer *cp = cls;
4171 struct ConnectedPeer *cp;
4172 struct ConnectedPeer *cps;
4173 struct CheckDuplicateRequestClosure cdc;
4174 struct GNUNET_TIME_Relative timeout;
4175 uint16_t msize;
4176 const struct GetMessage *gm;
4177 unsigned int bits;
4178 const GNUNET_HashCode *opt;
4179 uint32_t bm;
4180 size_t bfsize;
4181 uint32_t ttl_decrement;
4182 int32_t priority;
4183 enum GNUNET_BLOCK_Type type;
4184 int have_ns;
4185
4186 msize = ntohs(message->size);
4187 if (msize < sizeof (struct GetMessage))
4188 {
4189 GNUNET_break_op (0);
4190 return GNUNET_SYSERR;
4191 }
4192 gm = (const struct GetMessage*) message;
4193#if DEBUG_FS
4194 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4195 "Received request for `%s'\n",
4196 GNUNET_h2s (&gm->query));
4197#endif
4198 type = ntohl (gm->type);
4199 bm = ntohl (gm->hash_bitmap);
4200 bits = 0;
4201 while (bm > 0)
4202 {
4203 if (1 == (bm & 1))
4204 bits++;
4205 bm >>= 1;
4206 }
4207 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4208 {
4209 GNUNET_break_op (0);
4210 return GNUNET_SYSERR;
4211 }
4212 opt = (const GNUNET_HashCode*) &gm[1];
4213 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4214 /* bfsize must be power of 2, check! */
4215 if (0 != ( (bfsize - 1) & bfsize))
4216 {
4217 GNUNET_break_op (0);
4218 return GNUNET_SYSERR;
4219 }
4220 cover_query_count++;
4221 bm = ntohl (gm->hash_bitmap);
4222 bits = 0;
4223 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4224 &other->hashPubKey);
4225 if (NULL == cps)
4226 {
4227 /* peer must have just disconnected */
4228 GNUNET_STATISTICS_update (stats,
4229 gettext_noop ("# requests dropped due to initiator not being connected"),
4230 1,
4231 GNUNET_NO);
4232 return GNUNET_SYSERR;
4233 }
4234 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4235 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4236 &opt[bits++]);
4237 else
4238 cp = cps;
4239 if (cp == NULL)
4240 {
4241#if DEBUG_FS
4242 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4244 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4245 GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4246
4247 else
4248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4249 "Failed to find peer `%4s' in connection set. Dropping query.\n",
4250 GNUNET_i2s (other));
4251#endif
4252 GNUNET_STATISTICS_update (stats,
4253 gettext_noop ("# requests dropped due to missing reverse route"),
4254 1,
4255 GNUNET_NO);
4256 /* FIXME: try connect? */
4257 return GNUNET_OK;
4258 }
4259 /* note that we can really only check load here since otherwise
4260 peers could find out that we are overloaded by not being
4261 disconnected after sending us a malformed query... */
4262 priority = bound_priority (ntohl (gm->priority), cps);
4263 if (priority < 0)
4264 {
4265#if DEBUG_FS
4266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4267 "Dropping query from `%s', this peer is too busy.\n",
4268 GNUNET_i2s (other));
4269#endif
4270 return GNUNET_OK;
4271 }
4272#if DEBUG_FS
4273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4274 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4275 GNUNET_h2s (&gm->query),
4276 (unsigned int) type,
4277 GNUNET_i2s (other),
4278 (unsigned int) bm);
4279#endif
4280 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4281 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
4282 (have_ns ? sizeof(GNUNET_HashCode) : 0));
4283 if (have_ns)
4284 {
4285 pr->namespace = (GNUNET_HashCode*) &pr[1];
4286 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4287 }
4288 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4289 (GNUNET_LOAD_get_average (cp->transmission_delay) >
4290 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4291 {
4292 /* don't have BW to send to peer, or would likely take longer than we have for it,
4293 so at best indirect the query */
4294 priority = 0;
4295 pr->forward_only = GNUNET_YES;
4296 }
4297 pr->type = type;
4298 pr->mingle = ntohl (gm->filter_mutator);
4299 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4300 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4301 pr->anonymity_level = 1;
4302 pr->priority = (uint32_t) priority;
4303 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4304 pr->query = gm->query;
4305 /* decrement ttl (always) */
4306 ttl_decrement = 2 * TTL_DECREMENT +
4307 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4308 TTL_DECREMENT);
4309 if ( (pr->ttl < 0) &&
4310 (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4311 {
4312#if DEBUG_FS
4313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4314 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4315 GNUNET_i2s (other),
4316 pr->ttl,
4317 ttl_decrement);
4318#endif
4319 GNUNET_STATISTICS_update (stats,
4320 gettext_noop ("# requests dropped due TTL underflow"),
4321 1,
4322 GNUNET_NO);
4323 /* integer underflow => drop (should be very rare)! */
4324 GNUNET_free (pr);
4325 return GNUNET_OK;
4326 }
4327 pr->ttl -= ttl_decrement;
4328 pr->start_time = GNUNET_TIME_absolute_get ();
4329
4330 /* get bloom filter */
4331 if (bfsize > 0)
4332 {
4333 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4334 bfsize,
4335 BLOOMFILTER_K);
4336 pr->bf_size = bfsize;
4337 }
4338 cdc.have = NULL;
4339 cdc.pr = pr;
4340 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4341 &gm->query,
4342 &check_duplicate_request_peer,
4343 &cdc);
4344 if (cdc.have != NULL)
4345 {
4346 if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4347 pr->start_time.abs_value + pr->ttl)
4348 {
4349 /* existing request has higher TTL, drop new one! */
4350 cdc.have->priority += pr->priority;
4351 destroy_pending_request (pr);
4352#if DEBUG_FS
4353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4354 "Have existing request with higher TTL, dropping new request.\n",
4355 GNUNET_i2s (other));
4356#endif
4357 GNUNET_STATISTICS_update (stats,
4358 gettext_noop ("# requests dropped due to higher-TTL request"),
4359 1,
4360 GNUNET_NO);
4361 return GNUNET_OK;
4362 }
4363 else
4364 {
4365 /* existing request has lower TTL, drop old one! */
4366 pr->priority += cdc.have->priority;
4367 /* Possible optimization: if we have applicable pending
4368 replies in 'cdc.have', we might want to move those over
4369 (this is a really rare special-case, so it is not clear
4370 that this would be worth it) */
4371 destroy_pending_request (cdc.have);
4372 /* keep processing 'pr'! */
4373 }
4374 }
4375
4376 pr->cp = cp;
4377 GNUNET_break (GNUNET_OK ==
4378 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4379 &gm->query,
4380 pr,
4381 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4382 GNUNET_break (GNUNET_OK ==
4383 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4384 &other->hashPubKey,
4385 pr,
4386 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4387 437
4388 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, 438 GSF_plan_add_ (cp, pr);
4389 pr, 439 return GNUNET_YES;
4390 pr->start_time.abs_value + pr->ttl);
4391
4392 GNUNET_STATISTICS_update (stats,
4393 gettext_noop ("# P2P searches received"),
4394 1,
4395 GNUNET_NO);
4396 GNUNET_STATISTICS_update (stats,
4397 gettext_noop ("# P2P searches active"),
4398 1,
4399 GNUNET_NO);
4400
4401 /* calculate change in traffic preference */
4402 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4403 /* process locally */
4404 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4405 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4406 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4407 (pr->priority + 1));
4408 if (GNUNET_YES != pr->forward_only)
4409 {
4410#if DEBUG_FS
4411 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4412 "Handing request for `%s' to datastore\n",
4413 GNUNET_h2s (&gm->query));
4414#endif
4415 pr->qe = GNUNET_DATASTORE_iterate_key (dsh,
4416 &gm->query,
4417 type,
4418 pr->priority + 1,
4419 MAX_DATASTORE_QUEUE,
4420 timeout,
4421 &process_local_reply,
4422 pr);
4423 if (NULL == pr->qe)
4424 {
4425 GNUNET_STATISTICS_update (stats,
4426 gettext_noop ("# requests dropped by datastore (queue length limit)"),
4427 1,
4428 GNUNET_NO);
4429 }
4430 }
4431 else
4432 {
4433 GNUNET_STATISTICS_update (stats,
4434 gettext_noop ("# requests forwarded due to high load"),
4435 1,
4436 GNUNET_NO);
4437 }
4438
4439 /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
4440 switch (pr->type)
4441 {
4442 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4443 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4444 /* only one result, wait for datastore */
4445 if (GNUNET_YES != pr->forward_only)
4446 {
4447 GNUNET_STATISTICS_update (stats,
4448 gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4449 1,
4450 GNUNET_NO);
4451 break;
4452 }
4453 default:
4454 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4455 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4456 pr);
4457 }
4458
4459 /* make sure we don't track too many requests */
4460 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4461 {
4462 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4463 GNUNET_assert (pr != NULL);
4464 destroy_pending_request (pr);
4465 }
4466 return GNUNET_OK;
4467} 440}
4468 441
4469 442
4470/* **************************** CS GET Handling ************************ */
4471
4472
4473/** 443/**
4474 * Handle START_SEARCH-message (search request from client). 444 * Method called whenever a given peer connects.
4475 * 445 *
4476 * @param cls closure 446 * @param cls closure, not used
4477 * @param client identification of the client 447 * @param peer peer identity this notification is about
4478 * @param message the actual message 448 * @param atsi performance information
4479 */ 449 */
4480static void 450static void
4481handle_start_search (void *cls, 451peer_connect_handler (void *cls,
4482 struct GNUNET_SERVER_Client *client, 452 const struct GNUNET_PeerIdentity *peer,
4483 const struct GNUNET_MessageHeader *message) 453 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4484{ 454{
4485 static GNUNET_HashCode all_zeros; 455 struct GSF_ConnectedPeer *cp;
4486 const struct SearchMessage *sm;
4487 struct ClientList *cl;
4488 struct ClientRequestList *crl;
4489 struct PendingRequest *pr;
4490 uint16_t msize;
4491 unsigned int sc;
4492 enum GNUNET_BLOCK_Type type;
4493 456
4494 msize = ntohs (message->size); 457 if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
4495 if ( (msize < sizeof (struct SearchMessage)) || 458 return;
4496 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) ) 459 cp = GSF_peer_connect_handler_ (peer, atsi);
4497 { 460 if (NULL == cp)
4498 GNUNET_break (0); 461 return;
4499 GNUNET_SERVER_receive_done (client, 462 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
4500 GNUNET_SYSERR); 463 cp);
4501 return;
4502 }
4503 GNUNET_STATISTICS_update (stats,
4504 gettext_noop ("# client searches received"),
4505 1,
4506 GNUNET_NO);
4507 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4508 sm = (const struct SearchMessage*) message;
4509 type = ntohl (sm->type);
4510#if DEBUG_FS
4511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4512 "Received request for `%s' of type %u from local client\n",
4513 GNUNET_h2s (&sm->query),
4514 (unsigned int) type);
4515#endif
4516 cl = client_list;
4517 while ( (cl != NULL) &&
4518 (cl->client != client) )
4519 cl = cl->next;
4520 if (cl == NULL)
4521 {
4522 cl = GNUNET_malloc (sizeof (struct ClientList));
4523 cl->client = client;
4524 GNUNET_SERVER_client_keep (client);
4525 cl->next = client_list;
4526 client_list = cl;
4527 }
4528 /* detect duplicate KBLOCK requests */
4529 if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4530 (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4531 (type == GNUNET_BLOCK_TYPE_ANY) )
4532 {
4533 crl = cl->rl_head;
4534 while ( (crl != NULL) &&
4535 ( (0 != memcmp (&crl->req->query,
4536 &sm->query,
4537 sizeof (GNUNET_HashCode))) ||
4538 (crl->req->type != type) ) )
4539 crl = crl->next;
4540 if (crl != NULL)
4541 {
4542#if DEBUG_FS
4543 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4544 "Have existing request, merging content-seen lists.\n");
4545#endif
4546 pr = crl->req;
4547 /* Duplicate request (used to send long list of
4548 known/blocked results); merge 'pr->replies_seen'
4549 and update bloom filter */
4550 GNUNET_array_grow (pr->replies_seen,
4551 pr->replies_seen_size,
4552 pr->replies_seen_off + sc);
4553 memcpy (&pr->replies_seen[pr->replies_seen_off],
4554 &sm[1],
4555 sc * sizeof (GNUNET_HashCode));
4556 pr->replies_seen_off += sc;
4557 refresh_bloomfilter (pr);
4558 GNUNET_STATISTICS_update (stats,
4559 gettext_noop ("# client searches updated (merged content seen list)"),
4560 1,
4561 GNUNET_NO);
4562 GNUNET_SERVER_receive_done (client,
4563 GNUNET_OK);
4564 return;
4565 }
4566 }
4567 GNUNET_STATISTICS_update (stats,
4568 gettext_noop ("# client searches active"),
4569 1,
4570 GNUNET_NO);
4571 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
4572 ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4573 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4574 memset (crl, 0, sizeof (struct ClientRequestList));
4575 crl->client_list = cl;
4576 GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4577 cl->rl_tail,
4578 crl);
4579 crl->req = pr;
4580 pr->type = type;
4581 pr->client_request_list = crl;
4582 GNUNET_array_grow (pr->replies_seen,
4583 pr->replies_seen_size,
4584 sc);
4585 memcpy (pr->replies_seen,
4586 &sm[1],
4587 sc * sizeof (GNUNET_HashCode));
4588 pr->replies_seen_off = sc;
4589 pr->anonymity_level = ntohl (sm->anonymity_level);
4590 pr->start_time = GNUNET_TIME_absolute_get ();
4591 refresh_bloomfilter (pr);
4592 pr->query = sm->query;
4593 if (0 == (1 & ntohl (sm->options)))
4594 pr->local_only = GNUNET_NO;
4595 else
4596 pr->local_only = GNUNET_YES;
4597 switch (type)
4598 {
4599 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4600 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4601 if (0 != memcmp (&sm->target,
4602 &all_zeros,
4603 sizeof (GNUNET_HashCode)))
4604 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4605 break;
4606 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4607 pr->namespace = (GNUNET_HashCode*) &pr[1];
4608 memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4609 break;
4610 default:
4611 break;
4612 }
4613 GNUNET_break (GNUNET_OK ==
4614 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4615 &sm->query,
4616 pr,
4617 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4618 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4619 type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4620 pr->qe = GNUNET_DATASTORE_iterate_key (dsh,
4621 &sm->query,
4622 type,
4623 -3, -1,
4624 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
4625 &process_local_reply,
4626 pr);
4627} 464}
4628 465
4629 466
4630/* **************************** Startup ************************ */
4631
4632
4633
4634/** 467/**
4635 * Function called after GNUNET_CORE_connect has succeeded 468 * Function called after GNUNET_CORE_connect has succeeded
4636 * (or failed for good). Note that the private key of the 469 * (or failed for good). Note that the private key of the
@@ -4656,8 +489,6 @@ peer_init_handler (void *cls,
4656} 489}
4657 490
4658 491
4659
4660
4661/** 492/**
4662 * Process fs requests. 493 * Process fs requests.
4663 * 494 *
@@ -4674,7 +505,7 @@ main_init (struct GNUNET_SERVER_Handle *server,
4674 GNUNET_MESSAGE_TYPE_FS_GET, 0 }, 505 GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4675 { &handle_p2p_put, 506 { &handle_p2p_put,
4676 GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, 507 GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4677 { &handle_p2p_migration_stop, 508 { &GSF_handle_p2p_migration_stop_,
4678 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, 509 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4679 sizeof (struct MigrationStopMessage) }, 510 sizeof (struct MigrationStopMessage) },
4680 { NULL, 0, 0 } 511 { NULL, 0, 0 }
@@ -4690,91 +521,32 @@ main_init (struct GNUNET_SERVER_Handle *server,
4690 0 }, 521 0 },
4691 {NULL, NULL, 0, 0} 522 {NULL, NULL, 0, 0}
4692 }; 523 };
4693 unsigned long long enc = 128;
4694 524
4695 cfg = c; 525 GSF_core = GNUNET_CORE_connect (GSF_cfg,
4696 stats = GNUNET_STATISTICS_create ("fs", cfg); 526 2, /* larger? */
4697 min_migration_delay = GNUNET_TIME_UNIT_SECONDS; 527 NULL,
4698 if ( (GNUNET_OK != 528 &peer_init_handler,
4699 GNUNET_CONFIGURATION_get_value_number (cfg, 529 &peer_connect_handler,
4700 "fs", 530 &GSF_peer_disconnect_handler_,
4701 "MAX_PENDING_REQUESTS", 531 &GSF_peer_status_handler_,
4702 &max_pending_requests)) || 532 NULL, GNUNET_NO,
4703 (GNUNET_OK != 533 NULL, GNUNET_NO,
4704 GNUNET_CONFIGURATION_get_value_number (cfg, 534 p2p_handlers);
4705 "fs", 535 if (NULL == GSF_core)
4706 "EXPECTED_NEIGHBOUR_COUNT",
4707 &enc)) ||
4708 (GNUNET_OK !=
4709 GNUNET_CONFIGURATION_get_value_time (cfg,
4710 "fs",
4711 "MIN_MIGRATION_DELAY",
4712 &min_migration_delay)) )
4713 {
4714 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4715 _("Configuration fails to specify certain parameters, assuming default values."));
4716 }
4717 connected_peers = GNUNET_CONTAINER_multihashmap_create (enc);
4718 query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4719 rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4720 peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4721 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4722 core = GNUNET_CORE_connect (cfg,
4723 1, /* larger? */
4724 NULL,
4725 &peer_init_handler,
4726 &peer_connect_handler,
4727 &peer_disconnect_handler,
4728 &peer_status_handler,
4729 NULL, GNUNET_NO,
4730 NULL, GNUNET_NO,
4731 p2p_handlers);
4732 if (NULL == core)
4733 { 536 {
4734 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 537 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4735 _("Failed to connect to `%s' service.\n"), 538 _("Failed to connect to `%s' service.\n"),
4736 "core"); 539 "core");
4737 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4738 connected_peers = NULL;
4739 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4740 query_request_map = NULL;
4741 GNUNET_LOAD_value_free (rt_entry_lifetime);
4742 rt_entry_lifetime = NULL;
4743 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4744 requests_by_expiration_heap = NULL;
4745 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4746 peer_request_map = NULL;
4747 if (dsh != NULL)
4748 {
4749 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4750 dsh = NULL;
4751 }
4752 return GNUNET_SYSERR; 540 return GNUNET_SYSERR;
4753 } 541 }
4754 if (active_from_migration)
4755 {
4756 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4757 _("Content migration is enabled, will start to gather data\n"));
4758 consider_migration_gathering ();
4759 }
4760 consider_dht_put_gathering (NULL);
4761 GNUNET_SERVER_disconnect_notify (server, 542 GNUNET_SERVER_disconnect_notify (server,
4762 &handle_client_disconnect, 543 &GSF_client_disconnect_handler_,
4763 NULL); 544 NULL);
4764 GNUNET_assert (GNUNET_OK ==
4765 GNUNET_CONFIGURATION_get_value_filename (cfg,
4766 "fs",
4767 "TRUST",
4768 &trustDirectory));
4769 GNUNET_DISK_directory_create (trustDirectory);
4770 GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4771 &cron_flush_trust, NULL);
4772
4773
4774 GNUNET_SERVER_add_handlers (server, handlers); 545 GNUNET_SERVER_add_handlers (server, handlers);
4775 cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, 546 cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
4776 &age_cover_counters, 547 &age_cover_counters,
4777 NULL); 548 NULL);
549 datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4778 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, 550 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4779 &shutdown_task, 551 &shutdown_task,
4780 NULL); 552 NULL);
@@ -4794,45 +566,35 @@ run (void *cls,
4794 struct GNUNET_SERVER_Handle *server, 566 struct GNUNET_SERVER_Handle *server,
4795 const struct GNUNET_CONFIGURATION_Handle *cfg) 567 const struct GNUNET_CONFIGURATION_Handle *cfg)
4796{ 568{
4797 active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg, 569 GSF_cfg = cfg;
4798 "FS", 570 GSF_dsh = GNUNET_DATASTORE_connect (cfg);
4799 "CONTENT_CACHING"); 571 if (NULL == GSF_dsh)
4800 active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4801 "FS",
4802 "CONTENT_PUSHING");
4803 dsh = GNUNET_DATASTORE_connect (cfg);
4804 if (dsh == NULL)
4805 { 572 {
4806 GNUNET_SCHEDULER_shutdown (); 573 GNUNET_SCHEDULER_shutdown ();
4807 return; 574 return;
4808 } 575 }
4809 datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); 576 GSF_rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4810 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); 577 GSF_stats = GNUNET_STATISTICS_create ("fs", cfg);
4811 block_cfg = GNUNET_CONFIGURATION_create (); 578 block_cfg = GNUNET_CONFIGURATION_create ();
4812 GNUNET_CONFIGURATION_set_value_string (block_cfg, 579 GNUNET_CONFIGURATION_set_value_string (block_cfg,
4813 "block", 580 "block",
4814 "PLUGINS", 581 "PLUGINS",
4815 "fs"); 582 "fs");
4816 block_ctx = GNUNET_BLOCK_context_create (block_cfg); 583 GSF_block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4817 GNUNET_assert (NULL != block_ctx); 584 GNUNET_assert (NULL != GSF_block_ctx);
4818 dht_handle = GNUNET_DHT_connect (cfg, 585 GSF_dht = GNUNET_DHT_connect (cfg,
4819 FS_DHT_HT_SIZE); 586 FS_DHT_HT_SIZE);
4820 if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) || 587 GSF_plan_init ();
588 GSF_pending_request_init_ ();
589 GSF_connected_peer_init_ ();
590 GSF_push_init_ ();
591 GSF_put_init_ ();
592 if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, GSF_dsh)) ||
593
4821 (GNUNET_OK != main_init (server, cfg)) ) 594 (GNUNET_OK != main_init (server, cfg)) )
4822 { 595 {
4823 GNUNET_SCHEDULER_shutdown (); 596 GNUNET_SCHEDULER_shutdown ();
4824 GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO); 597 shutdown_task (NULL, NULL);
4825 dsh = NULL;
4826 GNUNET_DHT_disconnect (dht_handle);
4827 dht_handle = NULL;
4828 GNUNET_BLOCK_context_destroy (block_ctx);
4829 block_ctx = NULL;
4830 GNUNET_CONFIGURATION_destroy (block_cfg);
4831 block_cfg = NULL;
4832 GNUNET_LOAD_value_free (datastore_get_load);
4833 datastore_get_load = NULL;
4834 GNUNET_LOAD_value_free (datastore_put_load);
4835 datastore_put_load = NULL;
4836 return; 598 return;
4837 } 599 }
4838} 600}