diff options
Diffstat (limited to 'src/fs')
-rw-r--r-- | src/fs/Makefile.am | 22 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs.c | 4726 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_new.c | 621 |
3 files changed, 247 insertions, 5122 deletions
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index 94970019f..3dc5d64eb 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am | |||
@@ -54,7 +54,6 @@ bin_PROGRAMS = \ | |||
54 | gnunet-pseudonym \ | 54 | gnunet-pseudonym \ |
55 | gnunet-search \ | 55 | gnunet-search \ |
56 | gnunet-service-fs \ | 56 | gnunet-service-fs \ |
57 | gnunet-service-fs-new \ | ||
58 | gnunet-unindex | 57 | gnunet-unindex |
59 | 58 | ||
60 | gnunet_directory_SOURCES = \ | 59 | gnunet_directory_SOURCES = \ |
@@ -106,8 +105,8 @@ gnunet_search_LDADD = \ | |||
106 | gnunet_search_DEPENDENCIES = \ | 105 | gnunet_search_DEPENDENCIES = \ |
107 | libgnunetfs.la | 106 | libgnunetfs.la |
108 | 107 | ||
109 | gnunet_service_fs_new_SOURCES = \ | 108 | gnunet_service_fs_SOURCES = \ |
110 | gnunet-service-fs_new.c gnunet-service-fs.h \ | 109 | gnunet-service-fs.c gnunet-service-fs.h \ |
111 | gnunet-service-fs_cp.c gnunet-service-fs_cp.h \ | 110 | gnunet-service-fs_cp.c gnunet-service-fs_cp.h \ |
112 | gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h \ | 111 | gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h \ |
113 | gnunet-service-fs_lc.c gnunet-service-fs_lc.h \ | 112 | gnunet-service-fs_lc.c gnunet-service-fs_lc.h \ |
@@ -115,21 +114,6 @@ gnunet_service_fs_new_SOURCES = \ | |||
115 | gnunet-service-fs_pr.c gnunet-service-fs_pr.h \ | 114 | gnunet-service-fs_pr.c gnunet-service-fs_pr.h \ |
116 | gnunet-service-fs_push.c gnunet-service-fs_push.h \ | 115 | gnunet-service-fs_push.c gnunet-service-fs_push.h \ |
117 | gnunet-service-fs_put.c gnunet-service-fs_put.h | 116 | gnunet-service-fs_put.c gnunet-service-fs_put.h |
118 | gnunet_service_fs_new_LDADD = \ | ||
119 | $(top_builddir)/src/fs/libgnunetfs.la \ | ||
120 | $(top_builddir)/src/dht/libgnunetdht.la \ | ||
121 | $(top_builddir)/src/block/libgnunetblock.la \ | ||
122 | $(top_builddir)/src/datastore/libgnunetdatastore.la \ | ||
123 | $(top_builddir)/src/statistics/libgnunetstatistics.la \ | ||
124 | $(top_builddir)/src/core/libgnunetcore.la \ | ||
125 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
126 | $(GN_LIBINTL) | ||
127 | gnunet_service_fs_new_DEPENDENCIES = \ | ||
128 | libgnunetfs.la | ||
129 | |||
130 | gnunet_service_fs_SOURCES = \ | ||
131 | gnunet-service-fs.c gnunet-service-fs.h \ | ||
132 | gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h | ||
133 | gnunet_service_fs_LDADD = \ | 117 | gnunet_service_fs_LDADD = \ |
134 | $(top_builddir)/src/fs/libgnunetfs.la \ | 118 | $(top_builddir)/src/fs/libgnunetfs.la \ |
135 | $(top_builddir)/src/dht/libgnunetdht.la \ | 119 | $(top_builddir)/src/dht/libgnunetdht.la \ |
@@ -139,7 +123,7 @@ gnunet_service_fs_LDADD = \ | |||
139 | $(top_builddir)/src/core/libgnunetcore.la \ | 123 | $(top_builddir)/src/core/libgnunetcore.la \ |
140 | $(top_builddir)/src/util/libgnunetutil.la \ | 124 | $(top_builddir)/src/util/libgnunetutil.la \ |
141 | $(GN_LIBINTL) | 125 | $(GN_LIBINTL) |
142 | gnunet_service_fs_DEPENDENCIES = \ | 126 | gnunet_service_fs_new_DEPENDENCIES = \ |
143 | libgnunetfs.la | 127 | libgnunetfs.la |
144 | 128 | ||
145 | gnunet_unindex_SOURCES = \ | 129 | gnunet_unindex_SOURCES = \ |
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 35d89c50f..20a98e6f2 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | (C) 2009, 2010 Christian Grothoff (and other contributing authors) | 3 | (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -22,6 +22,11 @@ | |||
22 | * @file fs/gnunet-service-fs.c | 22 | * @file fs/gnunet-service-fs.c |
23 | * @brief gnunet anonymity protocol implementation | 23 | * @brief gnunet anonymity protocol implementation |
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * | ||
26 | * To use: | ||
27 | * - consider re-issue GSF_dht_lookup_ after non-DHT reply received | ||
28 | * - implement 'SUPPORT_DELAYS' | ||
29 | * | ||
25 | */ | 30 | */ |
26 | #include "platform.h" | 31 | #include "platform.h" |
27 | #include <float.h> | 32 | #include <float.h> |
@@ -36,30 +41,15 @@ | |||
36 | #include "gnunet_statistics_service.h" | 41 | #include "gnunet_statistics_service.h" |
37 | #include "gnunet_transport_service.h" | 42 | #include "gnunet_transport_service.h" |
38 | #include "gnunet_util_lib.h" | 43 | #include "gnunet_util_lib.h" |
44 | #include "gnunet-service-fs_cp.h" | ||
39 | #include "gnunet-service-fs_indexing.h" | 45 | #include "gnunet-service-fs_indexing.h" |
46 | #include "gnunet-service-fs_lc.h" | ||
47 | #include "gnunet-service-fs_pe.h" | ||
48 | #include "gnunet-service-fs_pr.h" | ||
49 | #include "gnunet-service-fs_push.h" | ||
50 | #include "gnunet-service-fs_put.h" | ||
40 | #include "fs.h" | 51 | #include "fs.h" |
41 | 52 | ||
42 | #define DEBUG_FS GNUNET_YES | ||
43 | |||
44 | /** | ||
45 | * Should we introduce random latency in processing? Required for proper | ||
46 | * implementation of GAP, but can be disabled for performance evaluation of | ||
47 | * the basic routing algorithm. | ||
48 | * | ||
49 | * Note that with delays enabled, performance can be significantly lower | ||
50 | * (several orders of magnitude in 2-peer test runs); if you want to | ||
51 | * measure throughput of other components, set this to NO. Also, you | ||
52 | * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for | ||
53 | * a rather wasteful mode of operation (that might still get the highest | ||
54 | * throughput overall). | ||
55 | * | ||
56 | * Performance measurements (for 50 MB file, 2 peers): | ||
57 | * | ||
58 | * - Without delays: 3300 kb/s | ||
59 | * - With delays: 101 kb/s | ||
60 | */ | ||
61 | #define SUPPORT_DELAYS GNUNET_NO | ||
62 | |||
63 | /** | 53 | /** |
64 | * Size for the hash map for DHT requests from the FS | 54 | * Size for the hash map for DHT requests from the FS |
65 | * service. Should be about the number of concurrent | 55 | * service. Should be about the number of concurrent |
@@ -67,17 +57,6 @@ | |||
67 | */ | 57 | */ |
68 | #define FS_DHT_HT_SIZE 1024 | 58 | #define FS_DHT_HT_SIZE 1024 |
69 | 59 | ||
70 | /** | ||
71 | * At what frequency should our datastore load decrease | ||
72 | * automatically (since if we don't use it, clearly the | ||
73 | * load must be going down). | ||
74 | */ | ||
75 | #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250) | ||
76 | |||
77 | /** | ||
78 | * How often do we flush trust values to disk? | ||
79 | */ | ||
80 | #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) | ||
81 | 60 | ||
82 | /** | 61 | /** |
83 | * How quickly do we age cover traffic? At the given | 62 | * How quickly do we age cover traffic? At the given |
@@ -86,830 +65,33 @@ | |||
86 | */ | 65 | */ |
87 | #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) | 66 | #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) |
88 | 67 | ||
89 | /** | ||
90 | * How often do we at most PUT content into the DHT? | ||
91 | */ | ||
92 | #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) | ||
93 | |||
94 | /** | ||
95 | * How long must content remain valid for us to consider it for migration? | ||
96 | * If content will expire too soon, there is clearly no point in pushing | ||
97 | * it to other peers. This value gives the threshold for migration. Note | ||
98 | * that if this value is increased, the migration testcase may need to be | ||
99 | * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c). | ||
100 | */ | ||
101 | #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30) | ||
102 | |||
103 | /** | ||
104 | * Inverse of the probability that we will submit the same query | ||
105 | * to the same peer again. If the same peer already got the query | ||
106 | * repeatedly recently, the probability is multiplied by the inverse | ||
107 | * of this number each time. Note that we only try about every TTL_DECREMENT/2 | ||
108 | * plus MAX_CORK_DELAY (so roughly every 3.5s). | ||
109 | * | ||
110 | * Note that this factor is a key influence to performance in small | ||
111 | * networks (especially test networks of 2 peers) because if there is | ||
112 | * only a single peer with the data, this value will determine how | ||
113 | * soon we might re-try. For example, a value of 3 can result in | ||
114 | * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would | ||
115 | * give us 5 MB/s. OTOH, obviously re-trying the same peer can be | ||
116 | * rather inefficient in larger networks, hence picking 1 is in | ||
117 | * general not the best choice. | ||
118 | * | ||
119 | * Performance measurements (for 50 MB file, 2 peers, no delays): | ||
120 | * | ||
121 | * - 1: 3300 kb/s (consistently) | ||
122 | * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s | ||
123 | * - 5: 759 kb/s, 968 kb/s, 1160 kb/s | ||
124 | * | ||
125 | * Note that this does NOT mean that the value should be 1 since | ||
126 | * a 2-peer network is far from representative here (and this fails | ||
127 | * to take into consideration bandwidth wasted by repeatedly | ||
128 | * sending queries to peers that don't have the content). Also, | ||
129 | * it is expected that higher values lead to more inconsistent | ||
130 | * measurements since this only affects lost messages towards the | ||
131 | * end of the download. | ||
132 | * | ||
133 | * Finally, we should probably consider changing this and making | ||
134 | * it dependent on the number of connected peers or a related | ||
135 | * metric (bad magic constants...). | ||
136 | */ | ||
137 | #define RETRY_PROBABILITY_INV 1 | ||
138 | |||
139 | /** | ||
140 | * What is the maximum delay for a P2P FS message (in our interaction | ||
141 | * with core)? FS-internal delays are another story. The value is | ||
142 | * chosen based on the 32k block size. Given that peers typcially | ||
143 | * have at least 1 kb/s bandwidth, 45s waits give us a chance to | ||
144 | * transmit one message even to the lowest-bandwidth peers. | ||
145 | */ | ||
146 | #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45) | ||
147 | |||
148 | /** | ||
149 | * Maximum number of requests (from other peers, overall) that we're | ||
150 | * willing to have pending at any given point in time. Can be changed | ||
151 | * via the configuration file (32k is just the default). | ||
152 | */ | ||
153 | static unsigned long long max_pending_requests = (32 * 1024); | ||
154 | |||
155 | /** | ||
156 | * Information we keep for each pending reply. The | ||
157 | * actual message follows at the end of this struct. | ||
158 | */ | ||
159 | struct PendingMessage; | ||
160 | |||
161 | /** | ||
162 | * Function called upon completion of a transmission. | ||
163 | * | ||
164 | * @param cls closure | ||
165 | * @param pid ID of receiving peer, 0 on transmission error | ||
166 | */ | ||
167 | typedef void (*TransmissionContinuation)(void * cls, | ||
168 | GNUNET_PEER_Id tpid); | ||
169 | |||
170 | |||
171 | /** | ||
172 | * Information we keep for each pending message (GET/PUT). The | ||
173 | * actual message follows at the end of this struct. | ||
174 | */ | ||
175 | struct PendingMessage | ||
176 | { | ||
177 | /** | ||
178 | * This is a doubly-linked list of messages to the same peer. | ||
179 | */ | ||
180 | struct PendingMessage *next; | ||
181 | |||
182 | /** | ||
183 | * This is a doubly-linked list of messages to the same peer. | ||
184 | */ | ||
185 | struct PendingMessage *prev; | ||
186 | |||
187 | /** | ||
188 | * Entry in pending message list for this pending message. | ||
189 | */ | ||
190 | struct PendingMessageList *pml; | ||
191 | |||
192 | /** | ||
193 | * Function to call immediately once we have transmitted this | ||
194 | * message. | ||
195 | */ | ||
196 | TransmissionContinuation cont; | ||
197 | |||
198 | /** | ||
199 | * Closure for cont. | ||
200 | */ | ||
201 | void *cont_cls; | ||
202 | |||
203 | /** | ||
204 | * Do not transmit this pending message until this deadline. | ||
205 | */ | ||
206 | struct GNUNET_TIME_Absolute delay_until; | ||
207 | |||
208 | /** | ||
209 | * Size of the reply; actual reply message follows | ||
210 | * at the end of this struct. | ||
211 | */ | ||
212 | size_t msize; | ||
213 | |||
214 | /** | ||
215 | * How important is this message for us? | ||
216 | */ | ||
217 | uint32_t priority; | ||
218 | |||
219 | }; | ||
220 | |||
221 | |||
222 | /** | ||
223 | * Information about a peer that we are connected to. | ||
224 | * We track data that is useful for determining which | ||
225 | * peers should receive our requests. We also keep | ||
226 | * a list of messages to transmit to this peer. | ||
227 | */ | ||
228 | struct ConnectedPeer | ||
229 | { | ||
230 | |||
231 | /** | ||
232 | * List of the last clients for which this peer successfully | ||
233 | * answered a query. | ||
234 | */ | ||
235 | struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE]; | ||
236 | |||
237 | /** | ||
238 | * List of the last PIDs for which | ||
239 | * this peer successfully answered a query; | ||
240 | * We use 0 to indicate no successful reply. | ||
241 | */ | ||
242 | GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE]; | ||
243 | |||
244 | /** | ||
245 | * Average delay between sending the peer a request and | ||
246 | * getting a reply (only calculated over the requests for | ||
247 | * which we actually got a reply). Calculated | ||
248 | * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n | ||
249 | */ | ||
250 | struct GNUNET_TIME_Relative avg_delay; | ||
251 | |||
252 | /** | ||
253 | * Point in time until which this peer does not want us to migrate content | ||
254 | * to it. | ||
255 | */ | ||
256 | struct GNUNET_TIME_Absolute migration_blocked; | ||
257 | |||
258 | /** | ||
259 | * Time until when we blocked this peer from migrating | ||
260 | * data to us. | ||
261 | */ | ||
262 | struct GNUNET_TIME_Absolute last_migration_block; | ||
263 | |||
264 | /** | ||
265 | * Transmission times for the last MAX_QUEUE_PER_PEER | ||
266 | * requests for this peer. Used as a ring buffer, current | ||
267 | * offset is stored in 'last_request_times_off'. If the | ||
268 | * oldest entry is more recent than the 'avg_delay', we should | ||
269 | * not send any more requests right now. | ||
270 | */ | ||
271 | struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER]; | ||
272 | |||
273 | /** | ||
274 | * Handle for an active request for transmission to this | ||
275 | * peer, or NULL. | ||
276 | */ | ||
277 | struct GNUNET_CORE_TransmitHandle *cth; | ||
278 | |||
279 | /** | ||
280 | * Messages (replies, queries, content migration) we would like to | ||
281 | * send to this peer in the near future. Sorted by priority, head. | ||
282 | */ | ||
283 | struct PendingMessage *pending_messages_head; | ||
284 | |||
285 | /** | ||
286 | * Messages (replies, queries, content migration) we would like to | ||
287 | * send to this peer in the near future. Sorted by priority, tail. | ||
288 | */ | ||
289 | struct PendingMessage *pending_messages_tail; | ||
290 | |||
291 | /** | ||
292 | * How long does it typically take for us to transmit a message | ||
293 | * to this peer? (delay between the request being issued and | ||
294 | * the callback being invoked). | ||
295 | */ | ||
296 | struct GNUNET_LOAD_Value *transmission_delay; | ||
297 | |||
298 | /** | ||
299 | * Context of our GNUNET_CORE_peer_change_preference call (or NULL). | ||
300 | */ | ||
301 | struct GNUNET_CORE_InformationRequestContext *irc; | ||
302 | |||
303 | /** | ||
304 | * Request for which 'irc' is currently active (or NULL). | ||
305 | */ | ||
306 | struct PendingRequest *pr; | ||
307 | |||
308 | /** | ||
309 | * Time when the last transmission request was issued. | ||
310 | */ | ||
311 | struct GNUNET_TIME_Absolute last_transmission_request_start; | ||
312 | |||
313 | /** | ||
314 | * ID of delay task for scheduling transmission. | ||
315 | */ | ||
316 | GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; | ||
317 | |||
318 | /** | ||
319 | * Average priority of successful replies. Calculated | ||
320 | * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n | ||
321 | */ | ||
322 | double avg_priority; | ||
323 | |||
324 | /** | ||
325 | * Increase in traffic preference still to be submitted | ||
326 | * to the core service for this peer. | ||
327 | */ | ||
328 | uint64_t inc_preference; | ||
329 | |||
330 | /** | ||
331 | * Trust rating for this peer | ||
332 | */ | ||
333 | uint32_t trust; | ||
334 | |||
335 | /** | ||
336 | * Trust rating for this peer on disk. | ||
337 | */ | ||
338 | uint32_t disk_trust; | ||
339 | |||
340 | /** | ||
341 | * The peer's identity. | ||
342 | */ | ||
343 | GNUNET_PEER_Id pid; | ||
344 | |||
345 | /** | ||
346 | * Size of the linked list of 'pending_messages'. | ||
347 | */ | ||
348 | unsigned int pending_requests; | ||
349 | |||
350 | /** | ||
351 | * Which offset in "last_p2p_replies" will be updated next? | ||
352 | * (we go round-robin). | ||
353 | */ | ||
354 | unsigned int last_p2p_replies_woff; | ||
355 | |||
356 | /** | ||
357 | * Which offset in "last_client_replies" will be updated next? | ||
358 | * (we go round-robin). | ||
359 | */ | ||
360 | unsigned int last_client_replies_woff; | ||
361 | |||
362 | /** | ||
363 | * Current offset into 'last_request_times' ring buffer. | ||
364 | */ | ||
365 | unsigned int last_request_times_off; | ||
366 | |||
367 | }; | ||
368 | |||
369 | |||
370 | /** | ||
371 | * Information we keep for each pending request. We should try to | ||
372 | * keep this struct as small as possible since its memory consumption | ||
373 | * is key to how many requests we can have pending at once. | ||
374 | */ | ||
375 | struct PendingRequest; | ||
376 | |||
377 | |||
378 | /** | ||
379 | * Doubly-linked list of requests we are performing | ||
380 | * on behalf of the same client. | ||
381 | */ | ||
382 | struct ClientRequestList | ||
383 | { | ||
384 | |||
385 | /** | ||
386 | * This is a doubly-linked list. | ||
387 | */ | ||
388 | struct ClientRequestList *next; | ||
389 | |||
390 | /** | ||
391 | * This is a doubly-linked list. | ||
392 | */ | ||
393 | struct ClientRequestList *prev; | ||
394 | |||
395 | /** | ||
396 | * Request this entry represents. | ||
397 | */ | ||
398 | struct PendingRequest *req; | ||
399 | |||
400 | /** | ||
401 | * Client list this request belongs to. | ||
402 | */ | ||
403 | struct ClientList *client_list; | ||
404 | |||
405 | }; | ||
406 | |||
407 | |||
408 | /** | ||
409 | * Replies to be transmitted to the client. The actual | ||
410 | * response message is allocated after this struct. | ||
411 | */ | ||
412 | struct ClientResponseMessage | ||
413 | { | ||
414 | /** | ||
415 | * This is a doubly-linked list. | ||
416 | */ | ||
417 | struct ClientResponseMessage *next; | ||
418 | |||
419 | /** | ||
420 | * This is a doubly-linked list. | ||
421 | */ | ||
422 | struct ClientResponseMessage *prev; | ||
423 | |||
424 | /** | ||
425 | * Client list entry this response belongs to. | ||
426 | */ | ||
427 | struct ClientList *client_list; | ||
428 | |||
429 | /** | ||
430 | * Number of bytes in the response. | ||
431 | */ | ||
432 | size_t msize; | ||
433 | }; | ||
434 | |||
435 | |||
436 | /** | ||
437 | * Linked list of clients we are performing requests | ||
438 | * for right now. | ||
439 | */ | ||
440 | struct ClientList | ||
441 | { | ||
442 | /** | ||
443 | * This is a linked list. | ||
444 | */ | ||
445 | struct ClientList *next; | ||
446 | |||
447 | /** | ||
448 | * ID of a client. | ||
449 | */ | ||
450 | struct GNUNET_SERVER_Client *client; | ||
451 | |||
452 | /** | ||
453 | * Head of list of requests performed on behalf | ||
454 | * of this client right now. | ||
455 | */ | ||
456 | struct ClientRequestList *rl_head; | ||
457 | |||
458 | /** | ||
459 | * Tail of list of requests performed on behalf | ||
460 | * of this client right now. | ||
461 | */ | ||
462 | struct ClientRequestList *rl_tail; | ||
463 | |||
464 | /** | ||
465 | * Head of linked list of responses. | ||
466 | */ | ||
467 | struct ClientResponseMessage *res_head; | ||
468 | |||
469 | /** | ||
470 | * Tail of linked list of responses. | ||
471 | */ | ||
472 | struct ClientResponseMessage *res_tail; | ||
473 | |||
474 | /** | ||
475 | * Context for sending replies. | ||
476 | */ | ||
477 | struct GNUNET_CONNECTION_TransmitHandle *th; | ||
478 | |||
479 | }; | ||
480 | |||
481 | |||
482 | /** | ||
483 | * Information about a peer that we have forwarded this | ||
484 | * request to already. | ||
485 | */ | ||
486 | struct UsedTargetEntry | ||
487 | { | ||
488 | /** | ||
489 | * What was the last time we have transmitted this request to this | ||
490 | * peer? | ||
491 | */ | ||
492 | struct GNUNET_TIME_Absolute last_request_time; | ||
493 | |||
494 | /** | ||
495 | * How often have we transmitted this request to this peer? | ||
496 | */ | ||
497 | unsigned int num_requests; | ||
498 | |||
499 | /** | ||
500 | * PID of the target peer. | ||
501 | */ | ||
502 | GNUNET_PEER_Id pid; | ||
503 | |||
504 | }; | ||
505 | |||
506 | |||
507 | /** | ||
508 | * Doubly-linked list of messages we are performing | ||
509 | * due to a pending request. | ||
510 | */ | ||
511 | struct PendingMessageList | ||
512 | { | ||
513 | |||
514 | /** | ||
515 | * This is a doubly-linked list of messages on behalf of the same request. | ||
516 | */ | ||
517 | struct PendingMessageList *next; | ||
518 | |||
519 | /** | ||
520 | * This is a doubly-linked list of messages on behalf of the same request. | ||
521 | */ | ||
522 | struct PendingMessageList *prev; | ||
523 | |||
524 | /** | ||
525 | * Message this entry represents. | ||
526 | */ | ||
527 | struct PendingMessage *pm; | ||
528 | |||
529 | /** | ||
530 | * Request this entry belongs to. | ||
531 | */ | ||
532 | struct PendingRequest *req; | ||
533 | |||
534 | /** | ||
535 | * Peer this message is targeted for. | ||
536 | */ | ||
537 | struct ConnectedPeer *target; | ||
538 | |||
539 | }; | ||
540 | |||
541 | |||
542 | /** | ||
543 | * Information we keep for each pending request. We should try to | ||
544 | * keep this struct as small as possible since its memory consumption | ||
545 | * is key to how many requests we can have pending at once. | ||
546 | */ | ||
547 | struct PendingRequest | ||
548 | { | ||
549 | |||
550 | /** | ||
551 | * If this request was made by a client, this is our entry in the | ||
552 | * client request list; otherwise NULL. | ||
553 | */ | ||
554 | struct ClientRequestList *client_request_list; | ||
555 | |||
556 | /** | ||
557 | * Entry of peer responsible for this entry (if this request | ||
558 | * was made by a peer). | ||
559 | */ | ||
560 | struct ConnectedPeer *cp; | ||
561 | |||
562 | /** | ||
563 | * If this is a namespace query, pointer to the hash of the public | ||
564 | * key of the namespace; otherwise NULL. Pointer will be to the | ||
565 | * end of this struct (so no need to free it). | ||
566 | */ | ||
567 | const GNUNET_HashCode *namespace; | ||
568 | |||
569 | /** | ||
570 | * Bloomfilter we use to filter out replies that we don't care about | ||
571 | * (anymore). NULL as long as we are interested in all replies. | ||
572 | */ | ||
573 | struct GNUNET_CONTAINER_BloomFilter *bf; | ||
574 | |||
575 | /** | ||
576 | * Reference to DHT get operation for this request (or NULL). | ||
577 | */ | ||
578 | struct GNUNET_DHT_GetHandle *dht_get; | ||
579 | |||
580 | /** | ||
581 | * Context of our GNUNET_CORE_peer_change_preference call. | ||
582 | */ | ||
583 | struct ConnectedPeer *pirc; | ||
584 | |||
585 | /** | ||
586 | * Hash code of all replies that we have seen so far (only valid | ||
587 | * if client is not NULL since we only track replies like this for | ||
588 | * our own clients). | ||
589 | */ | ||
590 | GNUNET_HashCode *replies_seen; | ||
591 | |||
592 | /** | ||
593 | * Node in the heap representing this entry; NULL | ||
594 | * if we have no heap node. | ||
595 | */ | ||
596 | struct GNUNET_CONTAINER_HeapNode *hnode; | ||
597 | |||
598 | /** | ||
599 | * Head of list of messages being performed on behalf of this | ||
600 | * request. | ||
601 | */ | ||
602 | struct PendingMessageList *pending_head; | ||
603 | |||
604 | /** | ||
605 | * Tail of list of messages being performed on behalf of this | ||
606 | * request. | ||
607 | */ | ||
608 | struct PendingMessageList *pending_tail; | ||
609 | |||
610 | /** | ||
611 | * When did we first see this request (form this peer), or, if our | ||
612 | * client is initiating, when did we last initiate a search? | ||
613 | */ | ||
614 | struct GNUNET_TIME_Absolute start_time; | ||
615 | |||
616 | /** | ||
617 | * The query that this request is for. | ||
618 | */ | ||
619 | GNUNET_HashCode query; | ||
620 | |||
621 | /** | ||
622 | * The task responsible for transmitting queries | ||
623 | * for this request. | ||
624 | */ | ||
625 | GNUNET_SCHEDULER_TaskIdentifier task; | ||
626 | |||
627 | /** | ||
628 | * (Interned) Peer identifier that identifies a preferred target | ||
629 | * for requests. | ||
630 | */ | ||
631 | GNUNET_PEER_Id target_pid; | ||
632 | |||
633 | /** | ||
634 | * (Interned) Peer identifiers of peers that have already | ||
635 | * received our query for this content. | ||
636 | */ | ||
637 | struct UsedTargetEntry *used_targets; | ||
638 | |||
639 | /** | ||
640 | * Our entry in the queue (non-NULL while we wait for our | ||
641 | * turn to interact with the local database). | ||
642 | */ | ||
643 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
644 | |||
645 | /** | ||
646 | * Size of the 'bf' (in bytes). | ||
647 | */ | ||
648 | size_t bf_size; | ||
649 | |||
650 | /** | ||
651 | * Desired anonymity level; only valid for requests from a local client. | ||
652 | */ | ||
653 | uint32_t anonymity_level; | ||
654 | |||
655 | /** | ||
656 | * How many entries in "used_targets" are actually valid? | ||
657 | */ | ||
658 | unsigned int used_targets_off; | ||
659 | |||
660 | /** | ||
661 | * How long is the "used_targets" array? | ||
662 | */ | ||
663 | unsigned int used_targets_size; | ||
664 | 68 | ||
665 | /** | 69 | /* ****************************** globals ****************************** */ |
666 | * Number of results found for this request. | ||
667 | */ | ||
668 | unsigned int results_found; | ||
669 | |||
670 | /** | ||
671 | * How many entries in "replies_seen" are actually valid? | ||
672 | */ | ||
673 | unsigned int replies_seen_off; | ||
674 | |||
675 | /** | ||
676 | * How long is the "replies_seen" array? | ||
677 | */ | ||
678 | unsigned int replies_seen_size; | ||
679 | |||
680 | /** | ||
681 | * Priority with which this request was made. If one of our clients | ||
682 | * made the request, then this is the current priority that we are | ||
683 | * using when initiating the request. This value is used when | ||
684 | * we decide to reward other peers with trust for providing a reply. | ||
685 | */ | ||
686 | uint32_t priority; | ||
687 | |||
688 | /** | ||
689 | * Priority points left for us to spend when forwarding this request | ||
690 | * to other peers. | ||
691 | */ | ||
692 | uint32_t remaining_priority; | ||
693 | |||
694 | /** | ||
695 | * Number to mingle hashes for bloom-filter tests with. | ||
696 | */ | ||
697 | int32_t mingle; | ||
698 | |||
699 | /** | ||
700 | * TTL with which we saw this request (or, if we initiated, TTL that | ||
701 | * we used for the request). | ||
702 | */ | ||
703 | int32_t ttl; | ||
704 | |||
705 | /** | ||
706 | * Type of the content that this request is for. | ||
707 | */ | ||
708 | enum GNUNET_BLOCK_Type type; | ||
709 | |||
710 | /** | ||
711 | * Remove this request after transmission of the current response. | ||
712 | */ | ||
713 | int8_t do_remove; | ||
714 | |||
715 | /** | ||
716 | * GNUNET_YES if we should not forward this request to other peers. | ||
717 | */ | ||
718 | int8_t local_only; | ||
719 | |||
720 | /** | ||
721 | * GNUNET_YES if we should not forward this request to other peers. (HUH?) | ||
722 | */ | ||
723 | int8_t forward_only; | ||
724 | |||
725 | }; | ||
726 | |||
727 | |||
728 | /** | ||
729 | * Block that is ready for migration to other peers. Actual data is at the end of the block. | ||
730 | */ | ||
731 | struct MigrationReadyBlock | ||
732 | { | ||
733 | |||
734 | /** | ||
735 | * This is a doubly-linked list. | ||
736 | */ | ||
737 | struct MigrationReadyBlock *next; | ||
738 | |||
739 | /** | ||
740 | * This is a doubly-linked list. | ||
741 | */ | ||
742 | struct MigrationReadyBlock *prev; | ||
743 | |||
744 | /** | ||
745 | * Query for the block. | ||
746 | */ | ||
747 | GNUNET_HashCode query; | ||
748 | |||
749 | /** | ||
750 | * When does this block expire? | ||
751 | */ | ||
752 | struct GNUNET_TIME_Absolute expiration; | ||
753 | |||
754 | /** | ||
755 | * Peers we would consider forwarding this | ||
756 | * block to. Zero for empty entries. | ||
757 | */ | ||
758 | GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; | ||
759 | |||
760 | /** | ||
761 | * Size of the block. | ||
762 | */ | ||
763 | size_t size; | ||
764 | |||
765 | /** | ||
766 | * Number of targets already used. | ||
767 | */ | ||
768 | unsigned int used_targets; | ||
769 | |||
770 | /** | ||
771 | * Type of the block. | ||
772 | */ | ||
773 | enum GNUNET_BLOCK_Type type; | ||
774 | }; | ||
775 | |||
776 | /** | ||
777 | * Identity of this peer. | ||
778 | */ | ||
779 | static struct GNUNET_PeerIdentity my_id; | ||
780 | 70 | ||
781 | /** | 71 | /** |
782 | * Our connection to the datastore. | 72 | * Our connection to the datastore. |
783 | */ | 73 | */ |
784 | static struct GNUNET_DATASTORE_Handle *dsh; | 74 | struct GNUNET_DATASTORE_Handle *GSF_dsh; |
785 | |||
786 | /** | ||
787 | * Our block context. | ||
788 | */ | ||
789 | static struct GNUNET_BLOCK_Context *block_ctx; | ||
790 | |||
791 | /** | ||
792 | * Our block configuration. | ||
793 | */ | ||
794 | static struct GNUNET_CONFIGURATION_Handle *block_cfg; | ||
795 | 75 | ||
796 | /** | 76 | /** |
797 | * Our configuration. | 77 | * Our configuration. |
798 | */ | 78 | */ |
799 | static const struct GNUNET_CONFIGURATION_Handle *cfg; | 79 | const struct GNUNET_CONFIGURATION_Handle *GSF_cfg; |
800 | |||
801 | /** | ||
802 | * Map of peer identifiers to "struct ConnectedPeer" (for that peer). | ||
803 | */ | ||
804 | static struct GNUNET_CONTAINER_MultiHashMap *connected_peers; | ||
805 | |||
806 | /** | ||
807 | * Map of peer identifiers to "struct PendingRequest" (for that peer). | ||
808 | */ | ||
809 | static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map; | ||
810 | |||
811 | /** | ||
812 | * Map of query identifiers to "struct PendingRequest" (for that query). | ||
813 | */ | ||
814 | static struct GNUNET_CONTAINER_MultiHashMap *query_request_map; | ||
815 | |||
816 | /** | ||
817 | * Heap with the request that will expire next at the top. Contains | ||
818 | * pointers of type "struct PendingRequest*"; these will *also* be | ||
819 | * aliased from the "requests_by_peer" data structures and the | ||
820 | * "requests_by_query" table. Note that requests from our clients | ||
821 | * don't expire and are thus NOT in the "requests_by_expiration" | ||
822 | * (or the "requests_by_peer" tables). | ||
823 | */ | ||
824 | static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap; | ||
825 | 80 | ||
826 | /** | 81 | /** |
827 | * Handle for reporting statistics. | 82 | * Handle for reporting statistics. |
828 | */ | 83 | */ |
829 | static struct GNUNET_STATISTICS_Handle *stats; | 84 | struct GNUNET_STATISTICS_Handle *GSF_stats; |
830 | |||
831 | /** | ||
832 | * Linked list of clients we are currently processing requests for. | ||
833 | */ | ||
834 | static struct ClientList *client_list; | ||
835 | |||
836 | /** | ||
837 | * Pointer to handle to the core service (points to NULL until we've | ||
838 | * connected to it). | ||
839 | */ | ||
840 | static struct GNUNET_CORE_Handle *core; | ||
841 | |||
842 | /** | ||
843 | * Head of linked list of blocks that can be migrated. | ||
844 | */ | ||
845 | static struct MigrationReadyBlock *mig_head; | ||
846 | |||
847 | /** | ||
848 | * Tail of linked list of blocks that can be migrated. | ||
849 | */ | ||
850 | static struct MigrationReadyBlock *mig_tail; | ||
851 | |||
852 | /** | ||
853 | * Request to datastore for migration (or NULL). | ||
854 | */ | ||
855 | static struct GNUNET_DATASTORE_QueueEntry *mig_qe; | ||
856 | |||
857 | /** | ||
858 | * Request to datastore for DHT PUTs (or NULL). | ||
859 | */ | ||
860 | static struct GNUNET_DATASTORE_QueueEntry *dht_qe; | ||
861 | |||
862 | /** | ||
863 | * Type we will request for the next DHT PUT round from the datastore. | ||
864 | */ | ||
865 | static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; | ||
866 | |||
867 | /** | ||
868 | * Where do we store trust information? | ||
869 | */ | ||
870 | static char *trustDirectory; | ||
871 | |||
872 | /** | ||
873 | * ID of task that collects blocks for migration. | ||
874 | */ | ||
875 | static GNUNET_SCHEDULER_TaskIdentifier mig_task; | ||
876 | |||
877 | /** | ||
878 | * ID of task that collects blocks for DHT PUTs. | ||
879 | */ | ||
880 | static GNUNET_SCHEDULER_TaskIdentifier dht_task; | ||
881 | |||
882 | /** | ||
883 | * What is the maximum frequency at which we are allowed to | ||
884 | * poll the datastore for migration content? | ||
885 | */ | ||
886 | static struct GNUNET_TIME_Relative min_migration_delay; | ||
887 | 85 | ||
888 | /** | 86 | /** |
889 | * Handle for DHT operations. | 87 | * Handle for DHT operations. |
890 | */ | 88 | */ |
891 | static struct GNUNET_DHT_Handle *dht_handle; | 89 | struct GNUNET_DHT_Handle *GSF_dht; |
892 | |||
893 | /** | ||
894 | * Size of the doubly-linked list of migration blocks. | ||
895 | */ | ||
896 | static unsigned int mig_size; | ||
897 | |||
898 | /** | ||
899 | * Are we allowed to migrate content to this peer. | ||
900 | */ | ||
901 | static int active_to_migration; | ||
902 | 90 | ||
903 | /** | 91 | /** |
904 | * Are we allowed to push out content from this peer. | 92 | * How long do requests typically stay in the routing table? |
905 | */ | ||
906 | static int active_from_migration; | ||
907 | |||
908 | /** | ||
909 | * How many entires with zero anonymity do we currently estimate | ||
910 | * to have in the database? | ||
911 | */ | 93 | */ |
912 | static unsigned int zero_anonymity_count_estimate; | 94 | struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime; |
913 | 95 | ||
914 | /** | 96 | /** |
915 | * Typical priorities we're seeing from other peers right now. Since | 97 | * Typical priorities we're seeing from other peers right now. Since |
@@ -921,1509 +103,87 @@ static unsigned int zero_anonymity_count_estimate; | |||
921 | * receiving the largest possible priority can still only raise our | 103 | * receiving the largest possible priority can still only raise our |
922 | * "current_priorities" by at most 1. | 104 | * "current_priorities" by at most 1. |
923 | */ | 105 | */ |
924 | static double current_priorities; | 106 | double GSF_current_priorities; |
925 | |||
926 | /** | ||
927 | * Datastore 'GET' load tracking. | ||
928 | */ | ||
929 | static struct GNUNET_LOAD_Value *datastore_get_load; | ||
930 | |||
931 | /** | ||
932 | * Datastore 'PUT' load tracking. | ||
933 | */ | ||
934 | static struct GNUNET_LOAD_Value *datastore_put_load; | ||
935 | |||
936 | /** | ||
937 | * How long do requests typically stay in the routing table? | ||
938 | */ | ||
939 | static struct GNUNET_LOAD_Value *rt_entry_lifetime; | ||
940 | 107 | ||
941 | /** | 108 | /** |
942 | * How many query messages have we received 'recently' that | 109 | * How many query messages have we received 'recently' that |
943 | * have not yet been claimed as cover traffic? | 110 | * have not yet been claimed as cover traffic? |
944 | */ | 111 | */ |
945 | static unsigned int cover_query_count; | 112 | unsigned int GSF_cover_query_count; |
946 | 113 | ||
947 | /** | 114 | /** |
948 | * How many content messages have we received 'recently' that | 115 | * How many content messages have we received 'recently' that |
949 | * have not yet been claimed as cover traffic? | 116 | * have not yet been claimed as cover traffic? |
950 | */ | 117 | */ |
951 | static unsigned int cover_content_count; | 118 | unsigned int GSF_cover_content_count; |
952 | |||
953 | /** | ||
954 | * ID of our task that we use to age the cover counters. | ||
955 | */ | ||
956 | static GNUNET_SCHEDULER_TaskIdentifier cover_age_task; | ||
957 | |||
958 | |||
959 | static void | ||
960 | age_cover_counters (void *cls, | ||
961 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
962 | { | ||
963 | cover_content_count = (cover_content_count * 15) / 16; | ||
964 | cover_query_count = (cover_query_count * 15) / 16; | ||
965 | cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, | ||
966 | &age_cover_counters, | ||
967 | NULL); | ||
968 | } | ||
969 | |||
970 | /** | ||
971 | * We've just now completed a datastore request. Update our | ||
972 | * datastore load calculations. | ||
973 | * | ||
974 | * @param start time when the datastore request was issued | ||
975 | */ | ||
976 | static void | ||
977 | update_datastore_delays (struct GNUNET_TIME_Absolute start) | ||
978 | { | ||
979 | struct GNUNET_TIME_Relative delay; | ||
980 | |||
981 | delay = GNUNET_TIME_absolute_get_duration (start); | ||
982 | GNUNET_LOAD_update (datastore_get_load, | ||
983 | delay.rel_value); | ||
984 | } | ||
985 | |||
986 | |||
987 | /** | ||
988 | * Get the filename under which we would store the GNUNET_HELLO_Message | ||
989 | * for the given host and protocol. | ||
990 | * @return filename of the form DIRECTORY/HOSTID | ||
991 | */ | ||
992 | static char * | ||
993 | get_trust_filename (const struct GNUNET_PeerIdentity *id) | ||
994 | { | ||
995 | struct GNUNET_CRYPTO_HashAsciiEncoded fil; | ||
996 | char *fn; | ||
997 | |||
998 | GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil); | ||
999 | GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil); | ||
1000 | return fn; | ||
1001 | } | ||
1002 | |||
1003 | |||
1004 | |||
1005 | /** | ||
1006 | * Transmit messages by copying it to the target buffer | ||
1007 | * "buf". "buf" will be NULL and "size" zero if the socket was closed | ||
1008 | * for writing in the meantime. In that case, do nothing | ||
1009 | * (the disconnect or shutdown handler will take care of the rest). | ||
1010 | * If we were able to transmit messages and there are still more | ||
1011 | * pending, ask core again for further calls to this function. | ||
1012 | * | ||
1013 | * @param cls closure, pointer to the 'struct ConnectedPeer*' | ||
1014 | * @param size number of bytes available in buf | ||
1015 | * @param buf where the callee should write the message | ||
1016 | * @return number of bytes written to buf | ||
1017 | */ | ||
1018 | static size_t | ||
1019 | transmit_to_peer (void *cls, | ||
1020 | size_t size, void *buf); | ||
1021 | |||
1022 | |||
1023 | /* ******************* clean up functions ************************ */ | ||
1024 | |||
1025 | /** | ||
1026 | * Delete the given migration block. | ||
1027 | * | ||
1028 | * @param mb block to delete | ||
1029 | */ | ||
1030 | static void | ||
1031 | delete_migration_block (struct MigrationReadyBlock *mb) | ||
1032 | { | ||
1033 | GNUNET_CONTAINER_DLL_remove (mig_head, | ||
1034 | mig_tail, | ||
1035 | mb); | ||
1036 | GNUNET_PEER_decrement_rcs (mb->target_list, | ||
1037 | MIGRATION_LIST_SIZE); | ||
1038 | mig_size--; | ||
1039 | GNUNET_free (mb); | ||
1040 | } | ||
1041 | |||
1042 | 119 | ||
1043 | /** | 120 | /** |
1044 | * Compare the distance of two peers to a key. | 121 | * Our block context. |
1045 | * | ||
1046 | * @param key key | ||
1047 | * @param p1 first peer | ||
1048 | * @param p2 second peer | ||
1049 | * @return GNUNET_YES if P1 is closer to key than P2 | ||
1050 | */ | ||
1051 | static int | ||
1052 | is_closer (const GNUNET_HashCode *key, | ||
1053 | const struct GNUNET_PeerIdentity *p1, | ||
1054 | const struct GNUNET_PeerIdentity *p2) | ||
1055 | { | ||
1056 | return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, | ||
1057 | &p2->hashPubKey, | ||
1058 | key); | ||
1059 | } | ||
1060 | |||
1061 | |||
1062 | /** | ||
1063 | * Consider migrating content to a given peer. | ||
1064 | * | ||
1065 | * @param cls 'struct MigrationReadyBlock*' to select | ||
1066 | * targets for (or NULL for none) | ||
1067 | * @param key ID of the peer | ||
1068 | * @param value 'struct ConnectedPeer' of the peer | ||
1069 | * @return GNUNET_YES (always continue iteration) | ||
1070 | */ | ||
1071 | static int | ||
1072 | consider_migration (void *cls, | ||
1073 | const GNUNET_HashCode *key, | ||
1074 | void *value) | ||
1075 | { | ||
1076 | struct MigrationReadyBlock *mb = cls; | ||
1077 | struct ConnectedPeer *cp = value; | ||
1078 | struct MigrationReadyBlock *pos; | ||
1079 | struct GNUNET_PeerIdentity cppid; | ||
1080 | struct GNUNET_PeerIdentity otherpid; | ||
1081 | struct GNUNET_PeerIdentity worstpid; | ||
1082 | size_t msize; | ||
1083 | unsigned int i; | ||
1084 | unsigned int repl; | ||
1085 | |||
1086 | /* consider 'cp' as a migration target for mb */ | ||
1087 | if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0) | ||
1088 | return GNUNET_YES; /* peer has requested no migration! */ | ||
1089 | if (mb != NULL) | ||
1090 | { | ||
1091 | GNUNET_PEER_resolve (cp->pid, | ||
1092 | &cppid); | ||
1093 | repl = MIGRATION_LIST_SIZE; | ||
1094 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | ||
1095 | { | ||
1096 | if (mb->target_list[i] == 0) | ||
1097 | { | ||
1098 | mb->target_list[i] = cp->pid; | ||
1099 | GNUNET_PEER_change_rc (mb->target_list[i], 1); | ||
1100 | repl = MIGRATION_LIST_SIZE; | ||
1101 | break; | ||
1102 | } | ||
1103 | GNUNET_PEER_resolve (mb->target_list[i], | ||
1104 | &otherpid); | ||
1105 | if ( (repl == MIGRATION_LIST_SIZE) && | ||
1106 | is_closer (&mb->query, | ||
1107 | &cppid, | ||
1108 | &otherpid)) | ||
1109 | { | ||
1110 | repl = i; | ||
1111 | worstpid = otherpid; | ||
1112 | } | ||
1113 | else if ( (repl != MIGRATION_LIST_SIZE) && | ||
1114 | (is_closer (&mb->query, | ||
1115 | &worstpid, | ||
1116 | &otherpid) ) ) | ||
1117 | { | ||
1118 | repl = i; | ||
1119 | worstpid = otherpid; | ||
1120 | } | ||
1121 | } | ||
1122 | if (repl != MIGRATION_LIST_SIZE) | ||
1123 | { | ||
1124 | GNUNET_PEER_change_rc (mb->target_list[repl], -1); | ||
1125 | mb->target_list[repl] = cp->pid; | ||
1126 | GNUNET_PEER_change_rc (mb->target_list[repl], 1); | ||
1127 | } | ||
1128 | } | ||
1129 | |||
1130 | /* consider scheduling transmission to cp for content migration */ | ||
1131 | if (cp->cth != NULL) | ||
1132 | return GNUNET_YES; | ||
1133 | msize = 0; | ||
1134 | pos = mig_head; | ||
1135 | while (pos != NULL) | ||
1136 | { | ||
1137 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | ||
1138 | { | ||
1139 | if (cp->pid == pos->target_list[i]) | ||
1140 | { | ||
1141 | if (msize == 0) | ||
1142 | msize = pos->size; | ||
1143 | else | ||
1144 | msize = GNUNET_MIN (msize, | ||
1145 | pos->size); | ||
1146 | break; | ||
1147 | } | ||
1148 | } | ||
1149 | pos = pos->next; | ||
1150 | } | ||
1151 | if (msize == 0) | ||
1152 | return GNUNET_YES; /* no content available */ | ||
1153 | #if DEBUG_FS | ||
1154 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1155 | "Trying to migrate at least %u bytes to peer `%s'\n", | ||
1156 | msize, | ||
1157 | GNUNET_h2s (key)); | ||
1158 | #endif | ||
1159 | if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) | ||
1160 | { | ||
1161 | GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); | ||
1162 | cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; | ||
1163 | } | ||
1164 | cp->cth | ||
1165 | = GNUNET_CORE_notify_transmit_ready (core, | ||
1166 | GNUNET_YES, | ||
1167 | 0, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1168 | (const struct GNUNET_PeerIdentity*) key, | ||
1169 | msize + sizeof (struct PutMessage), | ||
1170 | &transmit_to_peer, | ||
1171 | cp); | ||
1172 | return GNUNET_YES; | ||
1173 | } | ||
1174 | |||
1175 | |||
1176 | /** | ||
1177 | * Task that is run periodically to obtain blocks for content | ||
1178 | * migration | ||
1179 | * | ||
1180 | * @param cls unused | ||
1181 | * @param tc scheduler context (also unused) | ||
1182 | */ | ||
1183 | static void | ||
1184 | gather_migration_blocks (void *cls, | ||
1185 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
1186 | |||
1187 | |||
1188 | |||
1189 | |||
1190 | /** | ||
1191 | * Task that is run periodically to obtain blocks for DHT PUTs. | ||
1192 | * | ||
1193 | * @param cls type of blocks to gather | ||
1194 | * @param tc scheduler context (unused) | ||
1195 | */ | ||
1196 | static void | ||
1197 | gather_dht_put_blocks (void *cls, | ||
1198 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
1199 | |||
1200 | |||
1201 | /** | ||
1202 | * If the migration task is not currently running, consider | ||
1203 | * (re)scheduling it with the appropriate delay. | ||
1204 | */ | ||
1205 | static void | ||
1206 | consider_migration_gathering () | ||
1207 | { | ||
1208 | struct GNUNET_TIME_Relative delay; | ||
1209 | |||
1210 | if (dsh == NULL) | ||
1211 | return; | ||
1212 | if (mig_qe != NULL) | ||
1213 | return; | ||
1214 | if (mig_task != GNUNET_SCHEDULER_NO_TASK) | ||
1215 | return; | ||
1216 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, | ||
1217 | mig_size); | ||
1218 | delay = GNUNET_TIME_relative_divide (delay, | ||
1219 | MAX_MIGRATION_QUEUE); | ||
1220 | delay = GNUNET_TIME_relative_max (delay, | ||
1221 | min_migration_delay); | ||
1222 | mig_task = GNUNET_SCHEDULER_add_delayed (delay, | ||
1223 | &gather_migration_blocks, | ||
1224 | NULL); | ||
1225 | } | ||
1226 | |||
1227 | |||
1228 | /** | ||
1229 | * If the DHT PUT gathering task is not currently running, consider | ||
1230 | * (re)scheduling it with the appropriate delay. | ||
1231 | */ | ||
1232 | static void | ||
1233 | consider_dht_put_gathering (void *cls) | ||
1234 | { | ||
1235 | struct GNUNET_TIME_Relative delay; | ||
1236 | |||
1237 | if (dsh == NULL) | ||
1238 | return; | ||
1239 | if (dht_qe != NULL) | ||
1240 | return; | ||
1241 | if (dht_task != GNUNET_SCHEDULER_NO_TASK) | ||
1242 | return; | ||
1243 | if (zero_anonymity_count_estimate > 0) | ||
1244 | { | ||
1245 | delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, | ||
1246 | zero_anonymity_count_estimate); | ||
1247 | delay = GNUNET_TIME_relative_min (delay, | ||
1248 | MAX_DHT_PUT_FREQ); | ||
1249 | } | ||
1250 | else | ||
1251 | { | ||
1252 | /* if we have NO zero-anonymity content yet, wait 5 minutes for some to | ||
1253 | (hopefully) appear */ | ||
1254 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); | ||
1255 | } | ||
1256 | dht_task = GNUNET_SCHEDULER_add_delayed (delay, | ||
1257 | &gather_dht_put_blocks, | ||
1258 | cls); | ||
1259 | } | ||
1260 | |||
1261 | |||
1262 | /** | ||
1263 | * Process content offered for migration. | ||
1264 | * | ||
1265 | * @param cls closure | ||
1266 | * @param key key for the content | ||
1267 | * @param size number of bytes in data | ||
1268 | * @param data content stored | ||
1269 | * @param type type of the content | ||
1270 | * @param priority priority of the content | ||
1271 | * @param anonymity anonymity-level for the content | ||
1272 | * @param expiration expiration time for the content | ||
1273 | * @param uid unique identifier for the datum; | ||
1274 | * maybe 0 if no unique identifier is available | ||
1275 | */ | ||
1276 | static void | ||
1277 | process_migration_content (void *cls, | ||
1278 | const GNUNET_HashCode * key, | ||
1279 | size_t size, | ||
1280 | const void *data, | ||
1281 | enum GNUNET_BLOCK_Type type, | ||
1282 | uint32_t priority, | ||
1283 | uint32_t anonymity, | ||
1284 | struct GNUNET_TIME_Absolute | ||
1285 | expiration, uint64_t uid) | ||
1286 | { | ||
1287 | struct MigrationReadyBlock *mb; | ||
1288 | |||
1289 | if (key == NULL) | ||
1290 | { | ||
1291 | mig_qe = NULL; | ||
1292 | if (mig_size < MAX_MIGRATION_QUEUE) | ||
1293 | consider_migration_gathering (); | ||
1294 | return; | ||
1295 | } | ||
1296 | if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < | ||
1297 | MIN_MIGRATION_CONTENT_LIFETIME.rel_value) | ||
1298 | { | ||
1299 | /* content will expire soon, don't bother */ | ||
1300 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
1301 | return; | ||
1302 | } | ||
1303 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | ||
1304 | { | ||
1305 | if (GNUNET_OK != | ||
1306 | GNUNET_FS_handle_on_demand_block (key, size, data, | ||
1307 | type, priority, anonymity, | ||
1308 | expiration, uid, | ||
1309 | &process_migration_content, | ||
1310 | NULL)) | ||
1311 | { | ||
1312 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
1313 | } | ||
1314 | return; | ||
1315 | } | ||
1316 | #if DEBUG_FS | ||
1317 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1318 | "Retrieved block `%s' of type %u for migration\n", | ||
1319 | GNUNET_h2s (key), | ||
1320 | type); | ||
1321 | #endif | ||
1322 | mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); | ||
1323 | mb->query = *key; | ||
1324 | mb->expiration = expiration; | ||
1325 | mb->size = size; | ||
1326 | mb->type = type; | ||
1327 | memcpy (&mb[1], data, size); | ||
1328 | GNUNET_CONTAINER_DLL_insert_after (mig_head, | ||
1329 | mig_tail, | ||
1330 | mig_tail, | ||
1331 | mb); | ||
1332 | mig_size++; | ||
1333 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | ||
1334 | &consider_migration, | ||
1335 | mb); | ||
1336 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
1337 | } | ||
1338 | |||
1339 | |||
1340 | /** | ||
1341 | * Function called upon completion of the DHT PUT operation. | ||
1342 | */ | ||
1343 | static void | ||
1344 | dht_put_continuation (void *cls, | ||
1345 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1346 | { | ||
1347 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
1348 | } | ||
1349 | |||
1350 | |||
1351 | /** | ||
1352 | * Store content in DHT. | ||
1353 | * | ||
1354 | * @param cls closure | ||
1355 | * @param key key for the content | ||
1356 | * @param size number of bytes in data | ||
1357 | * @param data content stored | ||
1358 | * @param type type of the content | ||
1359 | * @param priority priority of the content | ||
1360 | * @param anonymity anonymity-level for the content | ||
1361 | * @param expiration expiration time for the content | ||
1362 | * @param uid unique identifier for the datum; | ||
1363 | * maybe 0 if no unique identifier is available | ||
1364 | */ | ||
1365 | static void | ||
1366 | process_dht_put_content (void *cls, | ||
1367 | const GNUNET_HashCode * key, | ||
1368 | size_t size, | ||
1369 | const void *data, | ||
1370 | enum GNUNET_BLOCK_Type type, | ||
1371 | uint32_t priority, | ||
1372 | uint32_t anonymity, | ||
1373 | struct GNUNET_TIME_Absolute | ||
1374 | expiration, uint64_t uid) | ||
1375 | { | ||
1376 | static unsigned int counter; | ||
1377 | static GNUNET_HashCode last_vhash; | ||
1378 | static GNUNET_HashCode vhash; | ||
1379 | |||
1380 | if (key == NULL) | ||
1381 | { | ||
1382 | dht_qe = NULL; | ||
1383 | consider_dht_put_gathering (cls); | ||
1384 | return; | ||
1385 | } | ||
1386 | /* slightly funky code to estimate the total number of values with zero | ||
1387 | anonymity from the maximum observed length of a monotonically increasing | ||
1388 | sequence of hashes over the contents */ | ||
1389 | GNUNET_CRYPTO_hash (data, size, &vhash); | ||
1390 | if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0) | ||
1391 | { | ||
1392 | if (zero_anonymity_count_estimate > 0) | ||
1393 | zero_anonymity_count_estimate /= 2; | ||
1394 | counter = 0; | ||
1395 | } | ||
1396 | last_vhash = vhash; | ||
1397 | if (counter < 31) | ||
1398 | counter++; | ||
1399 | if (zero_anonymity_count_estimate < (1 << counter)) | ||
1400 | zero_anonymity_count_estimate = (1 << counter); | ||
1401 | #if DEBUG_FS | ||
1402 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1403 | "Retrieved block `%s' of type %u for DHT PUT\n", | ||
1404 | GNUNET_h2s (key), | ||
1405 | type); | ||
1406 | #endif | ||
1407 | GNUNET_DHT_put (dht_handle, | ||
1408 | key, | ||
1409 | DEFAULT_PUT_REPLICATION, | ||
1410 | GNUNET_DHT_RO_NONE, | ||
1411 | type, | ||
1412 | size, | ||
1413 | data, | ||
1414 | expiration, | ||
1415 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1416 | &dht_put_continuation, | ||
1417 | cls); | ||
1418 | } | ||
1419 | |||
1420 | |||
1421 | /** | ||
1422 | * Task that is run periodically to obtain blocks for content | ||
1423 | * migration | ||
1424 | * | ||
1425 | * @param cls unused | ||
1426 | * @param tc scheduler context (also unused) | ||
1427 | */ | ||
1428 | static void | ||
1429 | gather_migration_blocks (void *cls, | ||
1430 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1431 | { | ||
1432 | mig_task = GNUNET_SCHEDULER_NO_TASK; | ||
1433 | if (dsh != NULL) | ||
1434 | { | ||
1435 | mig_qe = GNUNET_DATASTORE_get_for_replication (dsh, 0, UINT_MAX, | ||
1436 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1437 | &process_migration_content, NULL); | ||
1438 | GNUNET_assert (mig_qe != NULL); | ||
1439 | } | ||
1440 | } | ||
1441 | |||
1442 | |||
1443 | /** | ||
1444 | * Task that is run periodically to obtain blocks for DHT PUTs. | ||
1445 | * | ||
1446 | * @param cls type of blocks to gather | ||
1447 | * @param tc scheduler context (unused) | ||
1448 | */ | ||
1449 | static void | ||
1450 | gather_dht_put_blocks (void *cls, | ||
1451 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1452 | { | ||
1453 | dht_task = GNUNET_SCHEDULER_NO_TASK; | ||
1454 | if (dsh != NULL) | ||
1455 | { | ||
1456 | if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | ||
1457 | dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; | ||
1458 | dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (dsh, 0, UINT_MAX, | ||
1459 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1460 | dht_put_type++, | ||
1461 | &process_dht_put_content, NULL); | ||
1462 | GNUNET_assert (dht_qe != NULL); | ||
1463 | } | ||
1464 | } | ||
1465 | |||
1466 | |||
1467 | /** | ||
1468 | * We're done with a particular message list entry. | ||
1469 | * Free all associated resources. | ||
1470 | * | ||
1471 | * @param pml entry to destroy | ||
1472 | */ | ||
1473 | static void | ||
1474 | destroy_pending_message_list_entry (struct PendingMessageList *pml) | ||
1475 | { | ||
1476 | GNUNET_CONTAINER_DLL_remove (pml->req->pending_head, | ||
1477 | pml->req->pending_tail, | ||
1478 | pml); | ||
1479 | GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head, | ||
1480 | pml->target->pending_messages_tail, | ||
1481 | pml->pm); | ||
1482 | GNUNET_assert (pml->target->pending_requests > 0); | ||
1483 | pml->target->pending_requests--; | ||
1484 | GNUNET_free (pml->pm); | ||
1485 | GNUNET_free (pml); | ||
1486 | } | ||
1487 | |||
1488 | |||
1489 | /** | ||
1490 | * Destroy the given pending message (and call the respective | ||
1491 | * continuation). | ||
1492 | * | ||
1493 | * @param pm message to destroy | ||
1494 | * @param tpid id of peer that the message was delivered to, or 0 for none | ||
1495 | */ | ||
1496 | static void | ||
1497 | destroy_pending_message (struct PendingMessage *pm, | ||
1498 | GNUNET_PEER_Id tpid) | ||
1499 | { | ||
1500 | struct PendingMessageList *pml = pm->pml; | ||
1501 | TransmissionContinuation cont; | ||
1502 | void *cont_cls; | ||
1503 | |||
1504 | cont = pm->cont; | ||
1505 | cont_cls = pm->cont_cls; | ||
1506 | if (pml != NULL) | ||
1507 | { | ||
1508 | GNUNET_assert (pml->pm == pm); | ||
1509 | GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); | ||
1510 | destroy_pending_message_list_entry (pml); | ||
1511 | } | ||
1512 | else | ||
1513 | { | ||
1514 | GNUNET_free (pm); | ||
1515 | } | ||
1516 | if (cont != NULL) | ||
1517 | cont (cont_cls, tpid); | ||
1518 | } | ||
1519 | |||
1520 | |||
1521 | /** | ||
1522 | * We're done processing a particular request. | ||
1523 | * Free all associated resources. | ||
1524 | * | ||
1525 | * @param pr request to destroy | ||
1526 | */ | ||
1527 | static void | ||
1528 | destroy_pending_request (struct PendingRequest *pr) | ||
1529 | { | ||
1530 | struct GNUNET_PeerIdentity pid; | ||
1531 | unsigned int i; | ||
1532 | |||
1533 | if (pr->hnode != NULL) | ||
1534 | { | ||
1535 | GNUNET_CONTAINER_heap_remove_node (pr->hnode); | ||
1536 | pr->hnode = NULL; | ||
1537 | } | ||
1538 | if (NULL == pr->client_request_list) | ||
1539 | { | ||
1540 | GNUNET_STATISTICS_update (stats, | ||
1541 | gettext_noop ("# P2P searches active"), | ||
1542 | -1, | ||
1543 | GNUNET_NO); | ||
1544 | } | ||
1545 | else | ||
1546 | { | ||
1547 | GNUNET_STATISTICS_update (stats, | ||
1548 | gettext_noop ("# client searches active"), | ||
1549 | -1, | ||
1550 | GNUNET_NO); | ||
1551 | } | ||
1552 | if (GNUNET_YES == | ||
1553 | GNUNET_CONTAINER_multihashmap_remove (query_request_map, | ||
1554 | &pr->query, | ||
1555 | pr)) | ||
1556 | { | ||
1557 | GNUNET_LOAD_update (rt_entry_lifetime, | ||
1558 | GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value); | ||
1559 | } | ||
1560 | if (pr->qe != NULL) | ||
1561 | { | ||
1562 | GNUNET_DATASTORE_cancel (pr->qe); | ||
1563 | pr->qe = NULL; | ||
1564 | } | ||
1565 | if (pr->dht_get != NULL) | ||
1566 | { | ||
1567 | GNUNET_DHT_get_stop (pr->dht_get); | ||
1568 | pr->dht_get = NULL; | ||
1569 | } | ||
1570 | if (pr->client_request_list != NULL) | ||
1571 | { | ||
1572 | GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head, | ||
1573 | pr->client_request_list->client_list->rl_tail, | ||
1574 | pr->client_request_list); | ||
1575 | GNUNET_free (pr->client_request_list); | ||
1576 | pr->client_request_list = NULL; | ||
1577 | } | ||
1578 | if (pr->cp != NULL) | ||
1579 | { | ||
1580 | GNUNET_PEER_resolve (pr->cp->pid, | ||
1581 | &pid); | ||
1582 | (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map, | ||
1583 | &pid.hashPubKey, | ||
1584 | pr); | ||
1585 | pr->cp = NULL; | ||
1586 | } | ||
1587 | if (pr->bf != NULL) | ||
1588 | { | ||
1589 | GNUNET_CONTAINER_bloomfilter_free (pr->bf); | ||
1590 | pr->bf = NULL; | ||
1591 | } | ||
1592 | if (pr->pirc != NULL) | ||
1593 | { | ||
1594 | GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc); | ||
1595 | pr->pirc->irc = NULL; | ||
1596 | pr->pirc = NULL; | ||
1597 | } | ||
1598 | if (pr->replies_seen != NULL) | ||
1599 | { | ||
1600 | GNUNET_free (pr->replies_seen); | ||
1601 | pr->replies_seen = NULL; | ||
1602 | } | ||
1603 | if (pr->task != GNUNET_SCHEDULER_NO_TASK) | ||
1604 | { | ||
1605 | GNUNET_SCHEDULER_cancel (pr->task); | ||
1606 | pr->task = GNUNET_SCHEDULER_NO_TASK; | ||
1607 | } | ||
1608 | while (NULL != pr->pending_head) | ||
1609 | destroy_pending_message_list_entry (pr->pending_head); | ||
1610 | GNUNET_PEER_change_rc (pr->target_pid, -1); | ||
1611 | if (pr->used_targets != NULL) | ||
1612 | { | ||
1613 | for (i=0;i<pr->used_targets_off;i++) | ||
1614 | GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1); | ||
1615 | GNUNET_free (pr->used_targets); | ||
1616 | pr->used_targets_off = 0; | ||
1617 | pr->used_targets_size = 0; | ||
1618 | pr->used_targets = NULL; | ||
1619 | } | ||
1620 | GNUNET_free (pr); | ||
1621 | } | ||
1622 | |||
1623 | |||
1624 | /** | ||
1625 | * Find latency information in 'atsi'. | ||
1626 | * | ||
1627 | * @param atsi performance data | ||
1628 | * @return connection latency | ||
1629 | */ | ||
1630 | static struct GNUNET_TIME_Relative | ||
1631 | get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
1632 | { | ||
1633 | if (atsi == NULL) | ||
1634 | return GNUNET_TIME_UNIT_SECONDS; | ||
1635 | while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) && | ||
1636 | (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) ) | ||
1637 | atsi++; | ||
1638 | if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) | ||
1639 | { | ||
1640 | GNUNET_break (0); | ||
1641 | /* how can we not have latency data? */ | ||
1642 | return GNUNET_TIME_UNIT_SECONDS; | ||
1643 | } | ||
1644 | return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, | ||
1645 | ntohl (atsi->value)); | ||
1646 | } | ||
1647 | |||
1648 | |||
1649 | /** | ||
1650 | * Method called whenever a given peer connects. | ||
1651 | * | ||
1652 | * @param cls closure, not used | ||
1653 | * @param peer peer identity this notification is about | ||
1654 | * @param atsi performance information | ||
1655 | */ | ||
1656 | static void | ||
1657 | peer_connect_handler (void *cls, | ||
1658 | const struct | ||
1659 | GNUNET_PeerIdentity * peer, | ||
1660 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
1661 | { | ||
1662 | struct ConnectedPeer *cp; | ||
1663 | struct MigrationReadyBlock *pos; | ||
1664 | char *fn; | ||
1665 | uint32_t trust; | ||
1666 | struct GNUNET_TIME_Relative latency; | ||
1667 | |||
1668 | if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity))) | ||
1669 | return; | ||
1670 | latency = get_latency (atsi); | ||
1671 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
1672 | &peer->hashPubKey); | ||
1673 | if (NULL != cp) | ||
1674 | { | ||
1675 | GNUNET_break (0); | ||
1676 | return; | ||
1677 | } | ||
1678 | cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); | ||
1679 | cp->transmission_delay = GNUNET_LOAD_value_init (latency); | ||
1680 | cp->pid = GNUNET_PEER_intern (peer); | ||
1681 | |||
1682 | fn = get_trust_filename (peer); | ||
1683 | if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) && | ||
1684 | (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) | ||
1685 | cp->disk_trust = cp->trust = ntohl (trust); | ||
1686 | GNUNET_free (fn); | ||
1687 | |||
1688 | GNUNET_break (GNUNET_OK == | ||
1689 | GNUNET_CONTAINER_multihashmap_put (connected_peers, | ||
1690 | &peer->hashPubKey, | ||
1691 | cp, | ||
1692 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
1693 | |||
1694 | pos = mig_head; | ||
1695 | while (NULL != pos) | ||
1696 | { | ||
1697 | (void) consider_migration (pos, &peer->hashPubKey, cp); | ||
1698 | pos = pos->next; | ||
1699 | } | ||
1700 | } | ||
1701 | |||
1702 | |||
1703 | /** | ||
1704 | * Method called whenever a given peer has a status change. | ||
1705 | * | ||
1706 | * @param cls closure | ||
1707 | * @param peer peer identity this notification is about | ||
1708 | * @param bandwidth_in available amount of inbound bandwidth | ||
1709 | * @param bandwidth_out available amount of outbound bandwidth | ||
1710 | * @param timeout absolute time when this peer will time out | ||
1711 | * unless we see some further activity from it | ||
1712 | * @param atsi status information | ||
1713 | */ | ||
1714 | static void | ||
1715 | peer_status_handler (void *cls, | ||
1716 | const struct | ||
1717 | GNUNET_PeerIdentity * peer, | ||
1718 | struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, | ||
1719 | struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, | ||
1720 | struct GNUNET_TIME_Absolute timeout, | ||
1721 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
1722 | { | ||
1723 | struct ConnectedPeer *cp; | ||
1724 | struct GNUNET_TIME_Relative latency; | ||
1725 | |||
1726 | latency = get_latency (atsi); | ||
1727 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
1728 | &peer->hashPubKey); | ||
1729 | if (cp == NULL) | ||
1730 | { | ||
1731 | GNUNET_break (0); | ||
1732 | return; | ||
1733 | } | ||
1734 | GNUNET_LOAD_value_set_decline (cp->transmission_delay, | ||
1735 | latency); | ||
1736 | } | ||
1737 | |||
1738 | |||
1739 | |||
1740 | /** | ||
1741 | * Increase the host credit by a value. | ||
1742 | * | ||
1743 | * @param host which peer to change the trust value on | ||
1744 | * @param value is the int value by which the | ||
1745 | * host credit is to be increased or decreased | ||
1746 | * @returns the actual change in trust (positive or negative) | ||
1747 | */ | ||
1748 | static int | ||
1749 | change_host_trust (struct ConnectedPeer *host, int value) | ||
1750 | { | ||
1751 | if (value == 0) | ||
1752 | return 0; | ||
1753 | GNUNET_assert (host != NULL); | ||
1754 | if (value > 0) | ||
1755 | { | ||
1756 | if (host->trust + value < host->trust) | ||
1757 | { | ||
1758 | value = UINT32_MAX - host->trust; | ||
1759 | host->trust = UINT32_MAX; | ||
1760 | } | ||
1761 | else | ||
1762 | host->trust += value; | ||
1763 | } | ||
1764 | else | ||
1765 | { | ||
1766 | if (host->trust < -value) | ||
1767 | { | ||
1768 | value = -host->trust; | ||
1769 | host->trust = 0; | ||
1770 | } | ||
1771 | else | ||
1772 | host->trust += value; | ||
1773 | } | ||
1774 | return value; | ||
1775 | } | ||
1776 | |||
1777 | |||
1778 | /** | ||
1779 | * Write host-trust information to a file - flush the buffer entry! | ||
1780 | */ | ||
1781 | static int | ||
1782 | flush_trust (void *cls, | ||
1783 | const GNUNET_HashCode *key, | ||
1784 | void *value) | ||
1785 | { | ||
1786 | struct ConnectedPeer *host = value; | ||
1787 | char *fn; | ||
1788 | uint32_t trust; | ||
1789 | struct GNUNET_PeerIdentity pid; | ||
1790 | |||
1791 | if (host->trust == host->disk_trust) | ||
1792 | return GNUNET_OK; /* unchanged */ | ||
1793 | GNUNET_PEER_resolve (host->pid, | ||
1794 | &pid); | ||
1795 | fn = get_trust_filename (&pid); | ||
1796 | if (host->trust == 0) | ||
1797 | { | ||
1798 | if ((0 != UNLINK (fn)) && (errno != ENOENT)) | ||
1799 | GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING | | ||
1800 | GNUNET_ERROR_TYPE_BULK, "unlink", fn); | ||
1801 | } | ||
1802 | else | ||
1803 | { | ||
1804 | trust = htonl (host->trust); | ||
1805 | if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, | ||
1806 | sizeof(uint32_t), | ||
1807 | GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE | ||
1808 | | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ)) | ||
1809 | host->disk_trust = host->trust; | ||
1810 | } | ||
1811 | GNUNET_free (fn); | ||
1812 | return GNUNET_OK; | ||
1813 | } | ||
1814 | |||
1815 | /** | ||
1816 | * Call this method periodically to scan data/hosts for new hosts. | ||
1817 | */ | ||
1818 | static void | ||
1819 | cron_flush_trust (void *cls, | ||
1820 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1821 | { | ||
1822 | |||
1823 | if (NULL == connected_peers) | ||
1824 | return; | ||
1825 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | ||
1826 | &flush_trust, | ||
1827 | NULL); | ||
1828 | if (NULL == tc) | ||
1829 | return; | ||
1830 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
1831 | return; | ||
1832 | GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL); | ||
1833 | } | ||
1834 | |||
1835 | |||
1836 | /** | ||
1837 | * Free (each) request made by the peer. | ||
1838 | * | ||
1839 | * @param cls closure, points to peer that the request belongs to | ||
1840 | * @param key current key code | ||
1841 | * @param value value in the hash map | ||
1842 | * @return GNUNET_YES (we should continue to iterate) | ||
1843 | */ | 122 | */ |
1844 | static int | 123 | struct GNUNET_BLOCK_Context *GSF_block_ctx; |
1845 | destroy_request (void *cls, | ||
1846 | const GNUNET_HashCode * key, | ||
1847 | void *value) | ||
1848 | { | ||
1849 | const struct GNUNET_PeerIdentity * peer = cls; | ||
1850 | struct PendingRequest *pr = value; | ||
1851 | |||
1852 | GNUNET_break (GNUNET_YES == | ||
1853 | GNUNET_CONTAINER_multihashmap_remove (peer_request_map, | ||
1854 | &peer->hashPubKey, | ||
1855 | pr)); | ||
1856 | destroy_pending_request (pr); | ||
1857 | return GNUNET_YES; | ||
1858 | } | ||
1859 | |||
1860 | 124 | ||
1861 | /** | 125 | /** |
1862 | * Method called whenever a peer disconnects. | 126 | * Pointer to handle to the core service (points to NULL until we've |
1863 | * | 127 | * connected to it). |
1864 | * @param cls closure, not used | ||
1865 | * @param peer peer identity this notification is about | ||
1866 | */ | 128 | */ |
1867 | static void | 129 | struct GNUNET_CORE_Handle *GSF_core; |
1868 | peer_disconnect_handler (void *cls, | ||
1869 | const struct | ||
1870 | GNUNET_PeerIdentity * peer) | ||
1871 | { | ||
1872 | struct ConnectedPeer *cp; | ||
1873 | struct PendingMessage *pm; | ||
1874 | unsigned int i; | ||
1875 | struct MigrationReadyBlock *pos; | ||
1876 | struct MigrationReadyBlock *next; | ||
1877 | 130 | ||
1878 | if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity))) | ||
1879 | return; | ||
1880 | GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map, | ||
1881 | &peer->hashPubKey, | ||
1882 | &destroy_request, | ||
1883 | (void*) peer); | ||
1884 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
1885 | &peer->hashPubKey); | ||
1886 | if (cp == NULL) | ||
1887 | return; | ||
1888 | for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++) | ||
1889 | { | ||
1890 | if (NULL != cp->last_client_replies[i]) | ||
1891 | { | ||
1892 | GNUNET_SERVER_client_drop (cp->last_client_replies[i]); | ||
1893 | cp->last_client_replies[i] = NULL; | ||
1894 | } | ||
1895 | } | ||
1896 | GNUNET_break (GNUNET_YES == | ||
1897 | GNUNET_CONTAINER_multihashmap_remove (connected_peers, | ||
1898 | &peer->hashPubKey, | ||
1899 | cp)); | ||
1900 | if (cp->irc != NULL) | ||
1901 | { | ||
1902 | GNUNET_CORE_peer_change_preference_cancel (cp->irc); | ||
1903 | cp->irc = NULL; | ||
1904 | cp->pr->pirc = NULL; | ||
1905 | cp->pr = NULL; | ||
1906 | } | ||
1907 | |||
1908 | /* remove this peer from migration considerations; schedule | ||
1909 | alternatives */ | ||
1910 | next = mig_head; | ||
1911 | while (NULL != (pos = next)) | ||
1912 | { | ||
1913 | next = pos->next; | ||
1914 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | ||
1915 | { | ||
1916 | if (pos->target_list[i] == cp->pid) | ||
1917 | { | ||
1918 | GNUNET_PEER_change_rc (pos->target_list[i], -1); | ||
1919 | pos->target_list[i] = 0; | ||
1920 | } | ||
1921 | } | ||
1922 | if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) | ||
1923 | { | ||
1924 | delete_migration_block (pos); | ||
1925 | consider_migration_gathering (); | ||
1926 | continue; | ||
1927 | } | ||
1928 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | ||
1929 | &consider_migration, | ||
1930 | pos); | ||
1931 | } | ||
1932 | GNUNET_PEER_change_rc (cp->pid, -1); | ||
1933 | GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); | ||
1934 | if (NULL != cp->cth) | ||
1935 | { | ||
1936 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); | ||
1937 | cp->cth = NULL; | ||
1938 | } | ||
1939 | if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) | ||
1940 | { | ||
1941 | GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); | ||
1942 | cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; | ||
1943 | } | ||
1944 | while (NULL != (pm = cp->pending_messages_head)) | ||
1945 | destroy_pending_message (pm, 0 /* delivery failed */); | ||
1946 | GNUNET_LOAD_value_free (cp->transmission_delay); | ||
1947 | GNUNET_break (0 == cp->pending_requests); | ||
1948 | GNUNET_free (cp); | ||
1949 | } | ||
1950 | 131 | ||
132 | /* ***************************** locals ******************************* */ | ||
1951 | 133 | ||
1952 | /** | 134 | /** |
1953 | * Iterator over hash map entries that removes all occurences | 135 | * Configuration for block library. |
1954 | * of the given 'client' from the 'last_client_replies' of the | ||
1955 | * given connected peer. | ||
1956 | * | ||
1957 | * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove | ||
1958 | * @param key current key code (unused) | ||
1959 | * @param value value in the hash map (the 'struct ConnectedPeer*' to change) | ||
1960 | * @return GNUNET_YES (we should continue to iterate) | ||
1961 | */ | 136 | */ |
1962 | static int | 137 | static struct GNUNET_CONFIGURATION_Handle *block_cfg; |
1963 | remove_client_from_last_client_replies (void *cls, | ||
1964 | const GNUNET_HashCode * key, | ||
1965 | void *value) | ||
1966 | { | ||
1967 | struct GNUNET_SERVER_Client *client = cls; | ||
1968 | struct ConnectedPeer *cp = value; | ||
1969 | unsigned int i; | ||
1970 | |||
1971 | for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++) | ||
1972 | { | ||
1973 | if (cp->last_client_replies[i] == client) | ||
1974 | { | ||
1975 | GNUNET_SERVER_client_drop (cp->last_client_replies[i]); | ||
1976 | cp->last_client_replies[i] = NULL; | ||
1977 | } | ||
1978 | } | ||
1979 | return GNUNET_YES; | ||
1980 | } | ||
1981 | |||
1982 | 138 | ||
1983 | /** | 139 | /** |
1984 | * A client disconnected. Remove all of its pending queries. | 140 | * ID of our task that we use to age the cover counters. |
1985 | * | ||
1986 | * @param cls closure, NULL | ||
1987 | * @param client identification of the client | ||
1988 | */ | 141 | */ |
1989 | static void | 142 | static GNUNET_SCHEDULER_TaskIdentifier cover_age_task; |
1990 | handle_client_disconnect (void *cls, | ||
1991 | struct GNUNET_SERVER_Client | ||
1992 | * client) | ||
1993 | { | ||
1994 | struct ClientList *pos; | ||
1995 | struct ClientList *prev; | ||
1996 | struct ClientRequestList *rcl; | ||
1997 | struct ClientResponseMessage *creply; | ||
1998 | |||
1999 | if (client == NULL) | ||
2000 | return; | ||
2001 | prev = NULL; | ||
2002 | pos = client_list; | ||
2003 | while ( (NULL != pos) && | ||
2004 | (pos->client != client) ) | ||
2005 | { | ||
2006 | prev = pos; | ||
2007 | pos = pos->next; | ||
2008 | } | ||
2009 | if (pos == NULL) | ||
2010 | return; /* no requests pending for this client */ | ||
2011 | while (NULL != (rcl = pos->rl_head)) | ||
2012 | { | ||
2013 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
2014 | "Destroying pending request `%s' on disconnect\n", | ||
2015 | GNUNET_h2s (&rcl->req->query)); | ||
2016 | destroy_pending_request (rcl->req); | ||
2017 | } | ||
2018 | if (prev == NULL) | ||
2019 | client_list = pos->next; | ||
2020 | else | ||
2021 | prev->next = pos->next; | ||
2022 | if (pos->th != NULL) | ||
2023 | { | ||
2024 | GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); | ||
2025 | pos->th = NULL; | ||
2026 | } | ||
2027 | while (NULL != (creply = pos->res_head)) | ||
2028 | { | ||
2029 | GNUNET_CONTAINER_DLL_remove (pos->res_head, | ||
2030 | pos->res_tail, | ||
2031 | creply); | ||
2032 | GNUNET_free (creply); | ||
2033 | } | ||
2034 | GNUNET_SERVER_client_drop (pos->client); | ||
2035 | GNUNET_free (pos); | ||
2036 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | ||
2037 | &remove_client_from_last_client_replies, | ||
2038 | client); | ||
2039 | } | ||
2040 | |||
2041 | 143 | ||
2042 | /** | 144 | /** |
2043 | * Iterator to free peer entries. | 145 | * Datastore 'GET' load tracking. |
2044 | * | ||
2045 | * @param cls closure, unused | ||
2046 | * @param key current key code | ||
2047 | * @param value value in the hash map (peer entry) | ||
2048 | * @return GNUNET_YES (we should continue to iterate) | ||
2049 | */ | 146 | */ |
2050 | static int | 147 | static struct GNUNET_LOAD_Value *datastore_get_load; |
2051 | clean_peer (void *cls, | ||
2052 | const GNUNET_HashCode * key, | ||
2053 | void *value) | ||
2054 | { | ||
2055 | peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key); | ||
2056 | return GNUNET_YES; | ||
2057 | } | ||
2058 | |||
2059 | 148 | ||
2060 | /** | 149 | /** |
2061 | * Task run during shutdown. | 150 | * Identity of this peer. |
2062 | * | ||
2063 | * @param cls unused | ||
2064 | * @param tc unused | ||
2065 | */ | 151 | */ |
2066 | static void | 152 | static struct GNUNET_PeerIdentity my_id; |
2067 | shutdown_task (void *cls, | ||
2068 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
2069 | { | ||
2070 | if (mig_qe != NULL) | ||
2071 | { | ||
2072 | GNUNET_DATASTORE_cancel (mig_qe); | ||
2073 | mig_qe = NULL; | ||
2074 | } | ||
2075 | if (dht_qe != NULL) | ||
2076 | { | ||
2077 | GNUNET_DATASTORE_cancel (dht_qe); | ||
2078 | dht_qe = NULL; | ||
2079 | } | ||
2080 | if (GNUNET_SCHEDULER_NO_TASK != mig_task) | ||
2081 | { | ||
2082 | GNUNET_SCHEDULER_cancel (mig_task); | ||
2083 | mig_task = GNUNET_SCHEDULER_NO_TASK; | ||
2084 | } | ||
2085 | if (GNUNET_SCHEDULER_NO_TASK != dht_task) | ||
2086 | { | ||
2087 | GNUNET_SCHEDULER_cancel (dht_task); | ||
2088 | dht_task = GNUNET_SCHEDULER_NO_TASK; | ||
2089 | } | ||
2090 | while (client_list != NULL) | ||
2091 | handle_client_disconnect (NULL, | ||
2092 | client_list->client); | ||
2093 | cron_flush_trust (NULL, NULL); | ||
2094 | GNUNET_assert (NULL != core); | ||
2095 | GNUNET_CORE_disconnect (core); | ||
2096 | core = NULL; | ||
2097 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | ||
2098 | &clean_peer, | ||
2099 | NULL); | ||
2100 | GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap)); | ||
2101 | GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); | ||
2102 | requests_by_expiration_heap = 0; | ||
2103 | GNUNET_CONTAINER_multihashmap_destroy (connected_peers); | ||
2104 | connected_peers = NULL; | ||
2105 | GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map)); | ||
2106 | GNUNET_CONTAINER_multihashmap_destroy (query_request_map); | ||
2107 | query_request_map = NULL; | ||
2108 | GNUNET_LOAD_value_free (rt_entry_lifetime); | ||
2109 | rt_entry_lifetime = NULL; | ||
2110 | GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map)); | ||
2111 | GNUNET_CONTAINER_multihashmap_destroy (peer_request_map); | ||
2112 | peer_request_map = NULL; | ||
2113 | if (stats != NULL) | ||
2114 | { | ||
2115 | GNUNET_STATISTICS_destroy (stats, GNUNET_NO); | ||
2116 | stats = NULL; | ||
2117 | } | ||
2118 | if (dsh != NULL) | ||
2119 | { | ||
2120 | GNUNET_DATASTORE_disconnect (dsh, | ||
2121 | GNUNET_NO); | ||
2122 | dsh = NULL; | ||
2123 | } | ||
2124 | while (mig_head != NULL) | ||
2125 | delete_migration_block (mig_head); | ||
2126 | GNUNET_assert (0 == mig_size); | ||
2127 | GNUNET_DHT_disconnect (dht_handle); | ||
2128 | dht_handle = NULL; | ||
2129 | GNUNET_LOAD_value_free (datastore_get_load); | ||
2130 | datastore_get_load = NULL; | ||
2131 | GNUNET_LOAD_value_free (datastore_put_load); | ||
2132 | datastore_put_load = NULL; | ||
2133 | GNUNET_BLOCK_context_destroy (block_ctx); | ||
2134 | block_ctx = NULL; | ||
2135 | GNUNET_CONFIGURATION_destroy (block_cfg); | ||
2136 | block_cfg = NULL; | ||
2137 | cfg = NULL; | ||
2138 | GNUNET_free_non_null (trustDirectory); | ||
2139 | trustDirectory = NULL; | ||
2140 | GNUNET_SCHEDULER_cancel (cover_age_task); | ||
2141 | cover_age_task = GNUNET_SCHEDULER_NO_TASK; | ||
2142 | } | ||
2143 | |||
2144 | |||
2145 | /* ******************* Utility functions ******************** */ | ||
2146 | |||
2147 | 153 | ||
2148 | /** | 154 | /** |
2149 | * We've had to delay a request for transmission to core, but now | 155 | * Task that periodically ages our cover traffic statistics. |
2150 | * we should be ready. Run it. | ||
2151 | * | 156 | * |
2152 | * @param cls the 'struct ConnectedPeer' for which a request was delayed | 157 | * @param cls unused closure |
2153 | * @param tc task context (unused) | 158 | * @param tc task context |
2154 | */ | 159 | */ |
2155 | static void | 160 | static void |
2156 | delayed_transmission_request (void *cls, | 161 | age_cover_counters (void *cls, |
2157 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 162 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
2158 | { | 163 | { |
2159 | struct ConnectedPeer *cp = cls; | 164 | GSF_cover_content_count = (GSF_cover_content_count * 15) / 16; |
2160 | struct GNUNET_PeerIdentity pid; | 165 | GSF_cover_query_count = (GSF_cover_query_count * 15) / 16; |
2161 | struct PendingMessage *pm; | 166 | cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, |
2162 | 167 | &age_cover_counters, | |
2163 | pm = cp->pending_messages_head; | 168 | NULL); |
2164 | cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; | ||
2165 | GNUNET_assert (cp->cth == NULL); | ||
2166 | if (pm == NULL) | ||
2167 | return; | ||
2168 | GNUNET_PEER_resolve (cp->pid, | ||
2169 | &pid); | ||
2170 | cp->last_transmission_request_start = GNUNET_TIME_absolute_get (); | ||
2171 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
2172 | GNUNET_YES, | ||
2173 | pm->priority, | ||
2174 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
2175 | &pid, | ||
2176 | pm->msize, | ||
2177 | &transmit_to_peer, | ||
2178 | cp); | ||
2179 | } | 169 | } |
2180 | 170 | ||
2181 | 171 | ||
2182 | /** | ||
2183 | * Transmit messages by copying it to the target buffer | ||
2184 | * "buf". "buf" will be NULL and "size" zero if the socket was closed | ||
2185 | * for writing in the meantime. In that case, do nothing | ||
2186 | * (the disconnect or shutdown handler will take care of the rest). | ||
2187 | * If we were able to transmit messages and there are still more | ||
2188 | * pending, ask core again for further calls to this function. | ||
2189 | * | ||
2190 | * @param cls closure, pointer to the 'struct ConnectedPeer*' | ||
2191 | * @param size number of bytes available in buf | ||
2192 | * @param buf where the callee should write the message | ||
2193 | * @return number of bytes written to buf | ||
2194 | */ | ||
2195 | static size_t | ||
2196 | transmit_to_peer (void *cls, | ||
2197 | size_t size, void *buf) | ||
2198 | { | ||
2199 | struct ConnectedPeer *cp = cls; | ||
2200 | char *cbuf = buf; | ||
2201 | struct PendingMessage *pm; | ||
2202 | struct PendingMessage *next_pm; | ||
2203 | struct GNUNET_TIME_Absolute now; | ||
2204 | struct GNUNET_TIME_Relative min_delay; | ||
2205 | struct MigrationReadyBlock *mb; | ||
2206 | struct MigrationReadyBlock *next; | ||
2207 | struct PutMessage migm; | ||
2208 | size_t msize; | ||
2209 | unsigned int i; | ||
2210 | struct GNUNET_PeerIdentity pid; | ||
2211 | |||
2212 | cp->cth = NULL; | ||
2213 | if (NULL == buf) | ||
2214 | { | ||
2215 | #if DEBUG_FS | ||
2216 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2217 | "Dropping message, core too busy.\n"); | ||
2218 | #endif | ||
2219 | GNUNET_LOAD_update (cp->transmission_delay, | ||
2220 | UINT64_MAX); | ||
2221 | |||
2222 | if (NULL != (pm = cp->pending_messages_head)) | ||
2223 | { | ||
2224 | GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head, | ||
2225 | cp->pending_messages_tail, | ||
2226 | pm); | ||
2227 | GNUNET_assert (cp->pending_requests > 0); | ||
2228 | cp->pending_requests--; | ||
2229 | destroy_pending_message (pm, 0); | ||
2230 | } | ||
2231 | if (NULL != (pm = cp->pending_messages_head)) | ||
2232 | { | ||
2233 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task); | ||
2234 | min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until); | ||
2235 | cp->delayed_transmission_request_task | ||
2236 | = GNUNET_SCHEDULER_add_delayed (min_delay, | ||
2237 | &delayed_transmission_request, | ||
2238 | cp); | ||
2239 | } | ||
2240 | return 0; | ||
2241 | } | ||
2242 | GNUNET_LOAD_update (cp->transmission_delay, | ||
2243 | GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value); | ||
2244 | now = GNUNET_TIME_absolute_get (); | ||
2245 | msize = 0; | ||
2246 | min_delay = GNUNET_TIME_UNIT_FOREVER_REL; | ||
2247 | next_pm = cp->pending_messages_head; | ||
2248 | while ( (NULL != (pm = next_pm) ) && | ||
2249 | (pm->msize <= size) ) | ||
2250 | { | ||
2251 | next_pm = pm->next; | ||
2252 | if (pm->delay_until.abs_value > now.abs_value) | ||
2253 | { | ||
2254 | min_delay = GNUNET_TIME_relative_min (min_delay, | ||
2255 | GNUNET_TIME_absolute_get_remaining (pm->delay_until)); | ||
2256 | continue; | ||
2257 | } | ||
2258 | memcpy (&cbuf[msize], &pm[1], pm->msize); | ||
2259 | msize += pm->msize; | ||
2260 | size -= pm->msize; | ||
2261 | if (NULL == pm->pml) | ||
2262 | { | ||
2263 | GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head, | ||
2264 | cp->pending_messages_tail, | ||
2265 | pm); | ||
2266 | GNUNET_assert (cp->pending_requests > 0); | ||
2267 | cp->pending_requests--; | ||
2268 | } | ||
2269 | destroy_pending_message (pm, cp->pid); | ||
2270 | } | ||
2271 | if (pm != NULL) | ||
2272 | min_delay = GNUNET_TIME_UNIT_ZERO; | ||
2273 | if (NULL != cp->pending_messages_head) | ||
2274 | { | ||
2275 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task); | ||
2276 | cp->delayed_transmission_request_task | ||
2277 | = GNUNET_SCHEDULER_add_delayed (min_delay, | ||
2278 | &delayed_transmission_request, | ||
2279 | cp); | ||
2280 | } | ||
2281 | if (pm == NULL) | ||
2282 | { | ||
2283 | GNUNET_PEER_resolve (cp->pid, | ||
2284 | &pid); | ||
2285 | next = mig_head; | ||
2286 | while (NULL != (mb = next)) | ||
2287 | { | ||
2288 | next = mb->next; | ||
2289 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | ||
2290 | { | ||
2291 | if ( (cp->pid == mb->target_list[i]) && | ||
2292 | (mb->size + sizeof (migm) <= size) ) | ||
2293 | { | ||
2294 | GNUNET_PEER_change_rc (mb->target_list[i], -1); | ||
2295 | mb->target_list[i] = 0; | ||
2296 | mb->used_targets++; | ||
2297 | memset (&migm, 0, sizeof (migm)); | ||
2298 | migm.header.size = htons (sizeof (migm) + mb->size); | ||
2299 | migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | ||
2300 | migm.type = htonl (mb->type); | ||
2301 | migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration); | ||
2302 | memcpy (&cbuf[msize], &migm, sizeof (migm)); | ||
2303 | msize += sizeof (migm); | ||
2304 | size -= sizeof (migm); | ||
2305 | memcpy (&cbuf[msize], &mb[1], mb->size); | ||
2306 | msize += mb->size; | ||
2307 | size -= mb->size; | ||
2308 | #if DEBUG_FS | ||
2309 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2310 | "Pushing migration block `%s' (%u bytes) to `%s'\n", | ||
2311 | GNUNET_h2s (&mb->query), | ||
2312 | (unsigned int) mb->size, | ||
2313 | GNUNET_i2s (&pid)); | ||
2314 | #endif | ||
2315 | break; | ||
2316 | } | ||
2317 | else | ||
2318 | { | ||
2319 | #if DEBUG_FS | ||
2320 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2321 | "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n", | ||
2322 | GNUNET_h2s (&mb->query), | ||
2323 | (unsigned int) mb->size, | ||
2324 | GNUNET_i2s (&pid)); | ||
2325 | #endif | ||
2326 | } | ||
2327 | } | ||
2328 | if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) || | ||
2329 | (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) ) | ||
2330 | { | ||
2331 | delete_migration_block (mb); | ||
2332 | consider_migration_gathering (); | ||
2333 | } | ||
2334 | } | ||
2335 | consider_migration (NULL, | ||
2336 | &pid.hashPubKey, | ||
2337 | cp); | ||
2338 | } | ||
2339 | #if DEBUG_FS | ||
2340 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2341 | "Transmitting %u bytes to peer with PID %u\n", | ||
2342 | (unsigned int) msize, | ||
2343 | (unsigned int) cp->pid); | ||
2344 | #endif | ||
2345 | return msize; | ||
2346 | } | ||
2347 | |||
2348 | 172 | ||
2349 | /** | 173 | /** |
2350 | * Add a message to the set of pending messages for the given peer. | 174 | * We've just now completed a datastore request. Update our |
175 | * datastore load calculations. | ||
2351 | * | 176 | * |
2352 | * @param cp peer to send message to | 177 | * @param start time when the datastore request was issued |
2353 | * @param pm message to queue | ||
2354 | * @param pr request on which behalf this message is being queued | ||
2355 | */ | 178 | */ |
2356 | static void | 179 | void |
2357 | add_to_pending_messages_for_peer (struct ConnectedPeer *cp, | 180 | GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start) |
2358 | struct PendingMessage *pm, | ||
2359 | struct PendingRequest *pr) | ||
2360 | { | 181 | { |
2361 | struct PendingMessage *pos; | 182 | struct GNUNET_TIME_Relative delay; |
2362 | struct PendingMessageList *pml; | ||
2363 | struct GNUNET_PeerIdentity pid; | ||
2364 | 183 | ||
2365 | GNUNET_assert (pm->next == NULL); | 184 | delay = GNUNET_TIME_absolute_get_duration (start); |
2366 | GNUNET_assert (pm->pml == NULL); | 185 | GNUNET_LOAD_update (datastore_get_load, |
2367 | if (pr != NULL) | 186 | delay.rel_value); |
2368 | { | ||
2369 | pml = GNUNET_malloc (sizeof (struct PendingMessageList)); | ||
2370 | pml->req = pr; | ||
2371 | pml->target = cp; | ||
2372 | pml->pm = pm; | ||
2373 | pm->pml = pml; | ||
2374 | GNUNET_CONTAINER_DLL_insert (pr->pending_head, | ||
2375 | pr->pending_tail, | ||
2376 | pml); | ||
2377 | } | ||
2378 | pos = cp->pending_messages_head; | ||
2379 | while ( (pos != NULL) && | ||
2380 | (pm->priority < pos->priority) ) | ||
2381 | pos = pos->next; | ||
2382 | GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head, | ||
2383 | cp->pending_messages_tail, | ||
2384 | pos, | ||
2385 | pm); | ||
2386 | cp->pending_requests++; | ||
2387 | if (cp->pending_requests > MAX_QUEUE_PER_PEER) | ||
2388 | { | ||
2389 | GNUNET_STATISTICS_update (stats, | ||
2390 | gettext_noop ("# P2P searches discarded (queue length bound)"), | ||
2391 | 1, | ||
2392 | GNUNET_NO); | ||
2393 | destroy_pending_message (cp->pending_messages_tail, 0); | ||
2394 | } | ||
2395 | GNUNET_PEER_resolve (cp->pid, &pid); | ||
2396 | if (NULL != cp->cth) | ||
2397 | { | ||
2398 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); | ||
2399 | cp->cth = NULL; | ||
2400 | } | ||
2401 | if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) | ||
2402 | { | ||
2403 | GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); | ||
2404 | cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; | ||
2405 | } | ||
2406 | /* need to schedule transmission */ | ||
2407 | cp->last_transmission_request_start = GNUNET_TIME_absolute_get (); | ||
2408 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
2409 | GNUNET_YES, | ||
2410 | cp->pending_messages_head->priority, | ||
2411 | MAX_TRANSMIT_DELAY, | ||
2412 | &pid, | ||
2413 | cp->pending_messages_head->msize, | ||
2414 | &transmit_to_peer, | ||
2415 | cp); | ||
2416 | if (cp->cth == NULL) | ||
2417 | { | ||
2418 | #if DEBUG_FS | ||
2419 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2420 | "Failed to schedule transmission with core!\n"); | ||
2421 | #endif | ||
2422 | GNUNET_STATISTICS_update (stats, | ||
2423 | gettext_noop ("# CORE transmission failures"), | ||
2424 | 1, | ||
2425 | GNUNET_NO); | ||
2426 | } | ||
2427 | } | 187 | } |
2428 | 188 | ||
2429 | 189 | ||
@@ -2436,8 +196,8 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp, | |||
2436 | * GNUNET_NO to process normally (load normal) | 196 | * GNUNET_NO to process normally (load normal) |
2437 | * GNUNET_SYSERR to process for free (load low) | 197 | * GNUNET_SYSERR to process for free (load low) |
2438 | */ | 198 | */ |
2439 | static int | 199 | int |
2440 | test_get_load_too_high (uint32_t priority) | 200 | GSF_test_get_load_too_high_ (uint32_t priority) |
2441 | { | 201 | { |
2442 | double ld; | 202 | double ld; |
2443 | 203 | ||
@@ -2450,1220 +210,80 @@ test_get_load_too_high (uint32_t priority) | |||
2450 | } | 210 | } |
2451 | 211 | ||
2452 | 212 | ||
2453 | |||
2454 | |||
2455 | /** | ||
2456 | * Test if the DATABASE (PUT) load on this peer is too high | ||
2457 | * to even consider processing the query at | ||
2458 | * all. | ||
2459 | * | ||
2460 | * @return GNUNET_YES if the load is too high to do anything (load high) | ||
2461 | * GNUNET_NO to process normally (load normal or low) | ||
2462 | */ | ||
2463 | static int | ||
2464 | test_put_load_too_high (uint32_t priority) | ||
2465 | { | ||
2466 | double ld; | ||
2467 | |||
2468 | if (GNUNET_LOAD_get_average (datastore_put_load) < 50) | ||
2469 | return GNUNET_NO; /* very fast */ | ||
2470 | ld = GNUNET_LOAD_get_load (datastore_put_load); | ||
2471 | if (ld < 2.0 * (1 + priority)) | ||
2472 | return GNUNET_NO; | ||
2473 | GNUNET_STATISTICS_update (stats, | ||
2474 | gettext_noop ("# storage requests dropped due to high load"), | ||
2475 | 1, | ||
2476 | GNUNET_NO); | ||
2477 | return GNUNET_YES; | ||
2478 | } | ||
2479 | |||
2480 | |||
2481 | /* ******************* Pending Request Refresh Task ******************** */ | ||
2482 | |||
2483 | |||
2484 | |||
2485 | /** | ||
2486 | * We use a random delay to make the timing of requests less | ||
2487 | * predictable. This function returns such a random delay. We add a base | ||
2488 | * delay of MAX_CORK_DELAY (1s). | ||
2489 | * | ||
2490 | * FIXME: make schedule dependent on the specifics of the request? | ||
2491 | * Or bandwidth and number of connected peers and load? | ||
2492 | * | ||
2493 | * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms | ||
2494 | */ | ||
2495 | static struct GNUNET_TIME_Relative | ||
2496 | get_processing_delay () | ||
2497 | { | ||
2498 | return | ||
2499 | GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY, | ||
2500 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, | ||
2501 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
2502 | TTL_DECREMENT))); | ||
2503 | } | ||
2504 | |||
2505 | |||
2506 | /** | ||
2507 | * We're processing a GET request from another peer and have decided | ||
2508 | * to forward it to other peers. This function is called periodically | ||
2509 | * and should forward the request to other peers until we have all | ||
2510 | * possible replies. If we have transmitted the *only* reply to | ||
2511 | * the initiator we should destroy the pending request. If we have | ||
2512 | * many replies in the queue to the initiator, we should delay sending | ||
2513 | * out more queries until the reply queue has shrunk some. | ||
2514 | * | ||
2515 | * @param cls our "struct ProcessGetContext *" | ||
2516 | * @param tc unused | ||
2517 | */ | ||
2518 | static void | ||
2519 | forward_request_task (void *cls, | ||
2520 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
2521 | |||
2522 | |||
2523 | /** | ||
2524 | * Function called after we either failed or succeeded | ||
2525 | * at transmitting a query to a peer. | ||
2526 | * | ||
2527 | * @param cls the requests "struct PendingRequest*" | ||
2528 | * @param tpid ID of receiving peer, 0 on transmission error | ||
2529 | */ | ||
2530 | static void | ||
2531 | transmit_query_continuation (void *cls, | ||
2532 | GNUNET_PEER_Id tpid) | ||
2533 | { | ||
2534 | struct PendingRequest *pr = cls; | ||
2535 | unsigned int i; | ||
2536 | |||
2537 | GNUNET_STATISTICS_update (stats, | ||
2538 | gettext_noop ("# queries scheduled for forwarding"), | ||
2539 | -1, | ||
2540 | GNUNET_NO); | ||
2541 | if (tpid == 0) | ||
2542 | { | ||
2543 | #if DEBUG_FS | ||
2544 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2545 | "Transmission of request failed, will try again later.\n"); | ||
2546 | #endif | ||
2547 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
2548 | pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (), | ||
2549 | &forward_request_task, | ||
2550 | pr); | ||
2551 | return; | ||
2552 | } | ||
2553 | #if DEBUG_FS | ||
2554 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2555 | "Transmitted query `%s'\n", | ||
2556 | GNUNET_h2s (&pr->query)); | ||
2557 | #endif | ||
2558 | GNUNET_STATISTICS_update (stats, | ||
2559 | gettext_noop ("# queries forwarded"), | ||
2560 | 1, | ||
2561 | GNUNET_NO); | ||
2562 | for (i=0;i<pr->used_targets_off;i++) | ||
2563 | if (pr->used_targets[i].pid == tpid) | ||
2564 | break; /* found match! */ | ||
2565 | if (i == pr->used_targets_off) | ||
2566 | { | ||
2567 | /* need to create new entry */ | ||
2568 | if (pr->used_targets_off == pr->used_targets_size) | ||
2569 | GNUNET_array_grow (pr->used_targets, | ||
2570 | pr->used_targets_size, | ||
2571 | pr->used_targets_size * 2 + 2); | ||
2572 | GNUNET_PEER_change_rc (tpid, 1); | ||
2573 | pr->used_targets[pr->used_targets_off].pid = tpid; | ||
2574 | pr->used_targets[pr->used_targets_off].num_requests = 0; | ||
2575 | i = pr->used_targets_off++; | ||
2576 | } | ||
2577 | pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get (); | ||
2578 | pr->used_targets[i].num_requests++; | ||
2579 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
2580 | pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (), | ||
2581 | &forward_request_task, | ||
2582 | pr); | ||
2583 | } | ||
2584 | |||
2585 | |||
2586 | /** | ||
2587 | * How many bytes should a bloomfilter be if we have already seen | ||
2588 | * entry_count responses? Note that BLOOMFILTER_K gives us the number | ||
2589 | * of bits set per entry. Furthermore, we should not re-size the | ||
2590 | * filter too often (to keep it cheap). | ||
2591 | * | ||
2592 | * Since other peers will also add entries but not resize the filter, | ||
2593 | * we should generally pick a slightly larger size than what the | ||
2594 | * strict math would suggest. | ||
2595 | * | ||
2596 | * @return must be a power of two and smaller or equal to 2^15. | ||
2597 | */ | ||
2598 | static size_t | ||
2599 | compute_bloomfilter_size (unsigned int entry_count) | ||
2600 | { | ||
2601 | size_t size; | ||
2602 | unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4; | ||
2603 | uint16_t max = 1 << 15; | ||
2604 | |||
2605 | if (entry_count > max) | ||
2606 | return max; | ||
2607 | size = 8; | ||
2608 | while ((size < max) && (size < ideal)) | ||
2609 | size *= 2; | ||
2610 | if (size > max) | ||
2611 | return max; | ||
2612 | return size; | ||
2613 | } | ||
2614 | |||
2615 | |||
2616 | /** | ||
2617 | * Recalculate our bloom filter for filtering replies. This function | ||
2618 | * will create a new bloom filter from scratch, so it should only be | ||
2619 | * called if we have no bloomfilter at all (and hence can create a | ||
2620 | * fresh one of minimal size without problems) OR if our peer is the | ||
2621 | * initiator (in which case we may resize to larger than mimimum size). | ||
2622 | * | ||
2623 | * @param pr request for which the BF is to be recomputed | ||
2624 | */ | ||
2625 | static void | ||
2626 | refresh_bloomfilter (struct PendingRequest *pr) | ||
2627 | { | ||
2628 | unsigned int i; | ||
2629 | size_t nsize; | ||
2630 | GNUNET_HashCode mhash; | ||
2631 | |||
2632 | nsize = compute_bloomfilter_size (pr->replies_seen_off); | ||
2633 | if (nsize == pr->bf_size) | ||
2634 | return; /* size not changed */ | ||
2635 | if (pr->bf != NULL) | ||
2636 | GNUNET_CONTAINER_bloomfilter_free (pr->bf); | ||
2637 | pr->bf_size = nsize; | ||
2638 | pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1); | ||
2639 | pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, | ||
2640 | pr->bf_size, | ||
2641 | BLOOMFILTER_K); | ||
2642 | for (i=0;i<pr->replies_seen_off;i++) | ||
2643 | { | ||
2644 | GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i], | ||
2645 | pr->mingle, | ||
2646 | &mhash); | ||
2647 | GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash); | ||
2648 | } | ||
2649 | } | ||
2650 | |||
2651 | |||
2652 | /** | ||
2653 | * Function called after we've tried to reserve a certain amount of | ||
2654 | * bandwidth for a reply. Check if we succeeded and if so send our | ||
2655 | * query. | ||
2656 | * | ||
2657 | * @param cls the requests "struct PendingRequest*" | ||
2658 | * @param peer identifies the peer | ||
2659 | * @param bpm_out set to the current bandwidth limit (sending) for this peer | ||
2660 | * @param amount set to the amount that was actually reserved or unreserved | ||
2661 | * @param res_delay if the reservation could not be satisfied (amount was 0), how | ||
2662 | * long should the client wait until re-trying? | ||
2663 | * @param preference current traffic preference for the given peer | ||
2664 | */ | ||
2665 | static void | ||
2666 | target_reservation_cb (void *cls, | ||
2667 | const struct | ||
2668 | GNUNET_PeerIdentity * peer, | ||
2669 | struct GNUNET_BANDWIDTH_Value32NBO bpm_out, | ||
2670 | int32_t amount, | ||
2671 | struct GNUNET_TIME_Relative res_delay, | ||
2672 | uint64_t preference) | ||
2673 | { | ||
2674 | struct PendingRequest *pr = cls; | ||
2675 | struct ConnectedPeer *cp; | ||
2676 | struct PendingMessage *pm; | ||
2677 | struct GetMessage *gm; | ||
2678 | GNUNET_HashCode *ext; | ||
2679 | char *bfdata; | ||
2680 | size_t msize; | ||
2681 | unsigned int k; | ||
2682 | int no_route; | ||
2683 | uint32_t bm; | ||
2684 | unsigned int i; | ||
2685 | |||
2686 | #if DEBUG_FS | ||
2687 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2688 | "Core called back... for query `%s'.\n", | ||
2689 | GNUNET_h2s (&pr->query)); | ||
2690 | #endif | ||
2691 | /* (3) transmit, update ttl/priority */ | ||
2692 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
2693 | &peer->hashPubKey); | ||
2694 | if (cp == NULL) | ||
2695 | { | ||
2696 | /* Peer must have just left */ | ||
2697 | #if DEBUG_FS | ||
2698 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2699 | "Selected peer disconnected!\n"); | ||
2700 | #endif | ||
2701 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
2702 | pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (), | ||
2703 | &forward_request_task, | ||
2704 | pr); | ||
2705 | return; | ||
2706 | } | ||
2707 | cp->irc = NULL; | ||
2708 | pr->pirc = NULL; | ||
2709 | if (peer == NULL) | ||
2710 | { | ||
2711 | /* error in communication with core, try again later */ | ||
2712 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
2713 | pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (), | ||
2714 | &forward_request_task, | ||
2715 | pr); | ||
2716 | return; | ||
2717 | } | ||
2718 | no_route = GNUNET_NO; | ||
2719 | if (amount == 0) | ||
2720 | { | ||
2721 | if (pr->cp == NULL) | ||
2722 | { | ||
2723 | #if DEBUG_FS > 1 | ||
2724 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2725 | "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n", | ||
2726 | amount, | ||
2727 | DBLOCK_SIZE); | ||
2728 | #endif | ||
2729 | GNUNET_STATISTICS_update (stats, | ||
2730 | gettext_noop ("# reply bandwidth reservation requests failed"), | ||
2731 | 1, | ||
2732 | GNUNET_NO); | ||
2733 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
2734 | pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (), | ||
2735 | &forward_request_task, | ||
2736 | pr); | ||
2737 | return; /* this target round failed */ | ||
2738 | } | ||
2739 | no_route = GNUNET_YES; | ||
2740 | } | ||
2741 | |||
2742 | GNUNET_STATISTICS_update (stats, | ||
2743 | gettext_noop ("# queries scheduled for forwarding"), | ||
2744 | 1, | ||
2745 | GNUNET_NO); | ||
2746 | for (i=0;i<pr->used_targets_off;i++) | ||
2747 | if (pr->used_targets[i].pid == cp->pid) | ||
2748 | { | ||
2749 | GNUNET_STATISTICS_update (stats, | ||
2750 | gettext_noop ("# queries retransmitted to same target"), | ||
2751 | 1, | ||
2752 | GNUNET_NO); | ||
2753 | break; | ||
2754 | } | ||
2755 | |||
2756 | /* build message and insert message into priority queue */ | ||
2757 | #if DEBUG_FS | ||
2758 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2759 | "Forwarding request `%s' to `%4s'!\n", | ||
2760 | GNUNET_h2s (&pr->query), | ||
2761 | GNUNET_i2s (peer)); | ||
2762 | #endif | ||
2763 | k = 0; | ||
2764 | bm = 0; | ||
2765 | if (GNUNET_YES == no_route) | ||
2766 | { | ||
2767 | bm |= GET_MESSAGE_BIT_RETURN_TO; | ||
2768 | k++; | ||
2769 | } | ||
2770 | if (pr->namespace != NULL) | ||
2771 | { | ||
2772 | bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; | ||
2773 | k++; | ||
2774 | } | ||
2775 | if (pr->target_pid != 0) | ||
2776 | { | ||
2777 | bm |= GET_MESSAGE_BIT_TRANSMIT_TO; | ||
2778 | k++; | ||
2779 | } | ||
2780 | msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode); | ||
2781 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | ||
2782 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | ||
2783 | pm->msize = msize; | ||
2784 | gm = (struct GetMessage*) &pm[1]; | ||
2785 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); | ||
2786 | gm->header.size = htons (msize); | ||
2787 | gm->type = htonl (pr->type); | ||
2788 | pr->remaining_priority /= 2; | ||
2789 | gm->priority = htonl (pr->remaining_priority); | ||
2790 | gm->ttl = htonl (pr->ttl); | ||
2791 | gm->filter_mutator = htonl(pr->mingle); | ||
2792 | gm->hash_bitmap = htonl (bm); | ||
2793 | gm->query = pr->query; | ||
2794 | ext = (GNUNET_HashCode*) &gm[1]; | ||
2795 | k = 0; | ||
2796 | if (GNUNET_YES == no_route) | ||
2797 | GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]); | ||
2798 | if (pr->namespace != NULL) | ||
2799 | memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode)); | ||
2800 | if (pr->target_pid != 0) | ||
2801 | GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]); | ||
2802 | bfdata = (char *) &ext[k]; | ||
2803 | if (pr->bf != NULL) | ||
2804 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, | ||
2805 | bfdata, | ||
2806 | pr->bf_size); | ||
2807 | pm->cont = &transmit_query_continuation; | ||
2808 | pm->cont_cls = pr; | ||
2809 | cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get (); | ||
2810 | add_to_pending_messages_for_peer (cp, pm, pr); | ||
2811 | } | ||
2812 | |||
2813 | |||
2814 | /** | ||
2815 | * Closure used for "target_peer_select_cb". | ||
2816 | */ | ||
2817 | struct PeerSelectionContext | ||
2818 | { | ||
2819 | /** | ||
2820 | * The request for which we are selecting | ||
2821 | * peers. | ||
2822 | */ | ||
2823 | struct PendingRequest *pr; | ||
2824 | |||
2825 | /** | ||
2826 | * Current "prime" target. | ||
2827 | */ | ||
2828 | struct GNUNET_PeerIdentity target; | ||
2829 | |||
2830 | /** | ||
2831 | * How much do we like this target? | ||
2832 | */ | ||
2833 | double target_score; | ||
2834 | |||
2835 | /** | ||
2836 | * Does it make sense to we re-try quickly again? | ||
2837 | */ | ||
2838 | int fast_retry; | ||
2839 | |||
2840 | }; | ||
2841 | |||
2842 | |||
2843 | /** | ||
2844 | * Function called for each connected peer to determine | ||
2845 | * which one(s) would make good targets for forwarding. | ||
2846 | * | ||
2847 | * @param cls closure (struct PeerSelectionContext) | ||
2848 | * @param key current key code (peer identity) | ||
2849 | * @param value value in the hash map (struct ConnectedPeer) | ||
2850 | * @return GNUNET_YES if we should continue to | ||
2851 | * iterate, | ||
2852 | * GNUNET_NO if not. | ||
2853 | */ | ||
2854 | static int | ||
2855 | target_peer_select_cb (void *cls, | ||
2856 | const GNUNET_HashCode * key, | ||
2857 | void *value) | ||
2858 | { | ||
2859 | struct PeerSelectionContext *psc = cls; | ||
2860 | struct ConnectedPeer *cp = value; | ||
2861 | struct PendingRequest *pr = psc->pr; | ||
2862 | struct GNUNET_TIME_Relative delay; | ||
2863 | double score; | ||
2864 | unsigned int i; | ||
2865 | unsigned int pc; | ||
2866 | |||
2867 | /* 1) check that this peer is not the initiator */ | ||
2868 | if (cp == pr->cp) | ||
2869 | { | ||
2870 | #if DEBUG_FS | ||
2871 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2872 | "Skipping initiator in forwarding selection\n"); | ||
2873 | #endif | ||
2874 | return GNUNET_YES; /* skip */ | ||
2875 | } | ||
2876 | if (cp->irc != NULL) | ||
2877 | { | ||
2878 | psc->fast_retry = GNUNET_YES; | ||
2879 | return GNUNET_YES; /* skip: already querying core about this peer for other reasons */ | ||
2880 | } | ||
2881 | |||
2882 | /* 2) check if we have already (recently) forwarded to this peer */ | ||
2883 | /* 2a) this particular request */ | ||
2884 | pc = 0; | ||
2885 | for (i=0;i<pr->used_targets_off;i++) | ||
2886 | if (pr->used_targets[i].pid == cp->pid) | ||
2887 | { | ||
2888 | pc = pr->used_targets[i].num_requests; | ||
2889 | GNUNET_assert (pc > 0); | ||
2890 | /* FIXME: make re-enabling a peer independent of how often | ||
2891 | this function is called??? */ | ||
2892 | if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
2893 | RETRY_PROBABILITY_INV * pc)) | ||
2894 | { | ||
2895 | #if DEBUG_FS | ||
2896 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2897 | "NOT re-trying query that was previously transmitted %u times\n", | ||
2898 | (unsigned int) pc); | ||
2899 | #endif | ||
2900 | return GNUNET_YES; /* skip */ | ||
2901 | } | ||
2902 | break; | ||
2903 | } | ||
2904 | #if DEBUG_FS | ||
2905 | if (0 < pc) | ||
2906 | { | ||
2907 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
2908 | "Re-trying query that was previously transmitted %u times to this peer\n", | ||
2909 | (unsigned int) pc); | ||
2910 | } | ||
2911 | #endif | ||
2912 | /* 2b) many other requests to this peer */ | ||
2913 | delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]); | ||
2914 | if (delay.rel_value <= cp->avg_delay.rel_value) | ||
2915 | { | ||
2916 | #if DEBUG_FS | ||
2917 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2918 | "NOT sending query since we send %u others to this peer in the last %llums\n", | ||
2919 | MAX_QUEUE_PER_PEER, | ||
2920 | cp->avg_delay.rel_value); | ||
2921 | #endif | ||
2922 | return GNUNET_YES; /* skip */ | ||
2923 | } | ||
2924 | |||
2925 | /* 3) calculate how much we'd like to forward to this peer, | ||
2926 | starting with a random value that is strong enough | ||
2927 | to at least give any peer a chance sometimes | ||
2928 | (compared to the other factors that come later) */ | ||
2929 | /* 3a) count successful (recent) routes from cp for same source */ | ||
2930 | if (pr->cp != NULL) | ||
2931 | { | ||
2932 | score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
2933 | P2P_SUCCESS_LIST_SIZE); | ||
2934 | for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++) | ||
2935 | if (cp->last_p2p_replies[i] == pr->cp->pid) | ||
2936 | score += 1.0; /* likely successful based on hot path */ | ||
2937 | } | ||
2938 | else | ||
2939 | { | ||
2940 | score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
2941 | CS2P_SUCCESS_LIST_SIZE); | ||
2942 | for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++) | ||
2943 | if (cp->last_client_replies[i] == pr->client_request_list->client_list->client) | ||
2944 | score += 1.0; /* likely successful based on hot path */ | ||
2945 | } | ||
2946 | /* 3b) include latency */ | ||
2947 | if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT) | ||
2948 | score += 1.0; /* likely fast based on latency */ | ||
2949 | /* 3c) include priorities */ | ||
2950 | if (cp->avg_priority <= pr->remaining_priority / 2.0) | ||
2951 | score += 1.0; /* likely successful based on priorities */ | ||
2952 | /* 3d) penalize for queue size */ | ||
2953 | score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); | ||
2954 | /* 3e) include peer proximity */ | ||
2955 | score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key, | ||
2956 | &pr->query)) / (double) UINT32_MAX); | ||
2957 | /* 4) super-bonus for being the known target */ | ||
2958 | if (pr->target_pid == cp->pid) | ||
2959 | score += 100.0; | ||
2960 | /* store best-fit in closure */ | ||
2961 | score++; /* avoid zero */ | ||
2962 | if (score > psc->target_score) | ||
2963 | { | ||
2964 | psc->target_score = score; | ||
2965 | psc->target.hashPubKey = *key; | ||
2966 | } | ||
2967 | #if DEBUG_FS | ||
2968 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2969 | "Peer `%s' gets score %f for forwarding query, max is %8f\n", | ||
2970 | GNUNET_h2s (key), | ||
2971 | score, | ||
2972 | psc->target_score); | ||
2973 | #endif | ||
2974 | return GNUNET_YES; | ||
2975 | } | ||
2976 | |||
2977 | |||
2978 | /** | ||
2979 | * The priority level imposes a bound on the maximum | ||
2980 | * value for the ttl that can be requested. | ||
2981 | * | ||
2982 | * @param ttl_in requested ttl | ||
2983 | * @param prio given priority | ||
2984 | * @return ttl_in if ttl_in is below the limit, | ||
2985 | * otherwise the ttl-limit for the given priority | ||
2986 | */ | ||
2987 | static int32_t | ||
2988 | bound_ttl (int32_t ttl_in, uint32_t prio) | ||
2989 | { | ||
2990 | unsigned long long allowed; | ||
2991 | |||
2992 | if (ttl_in <= 0) | ||
2993 | return ttl_in; | ||
2994 | allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; | ||
2995 | if (ttl_in > allowed) | ||
2996 | { | ||
2997 | if (allowed >= (1 << 30)) | ||
2998 | return 1 << 30; | ||
2999 | return allowed; | ||
3000 | } | ||
3001 | return ttl_in; | ||
3002 | } | ||
3003 | |||
3004 | |||
3005 | /** | ||
3006 | * Iterator called on each result obtained for a DHT | ||
3007 | * operation that expects a reply | ||
3008 | * | ||
3009 | * @param cls closure | ||
3010 | * @param exp when will this value expire | ||
3011 | * @param key key of the result | ||
3012 | * @param get_path NULL-terminated array of pointers | ||
3013 | * to the peers on reverse GET path (or NULL if not recorded) | ||
3014 | * @param put_path NULL-terminated array of pointers | ||
3015 | * to the peers on the PUT path (or NULL if not recorded) | ||
3016 | * @param type type of the result | ||
3017 | * @param size number of bytes in data | ||
3018 | * @param data pointer to the result data | ||
3019 | */ | ||
3020 | static void | ||
3021 | process_dht_reply (void *cls, | ||
3022 | struct GNUNET_TIME_Absolute exp, | ||
3023 | const GNUNET_HashCode * key, | ||
3024 | const struct GNUNET_PeerIdentity * const *get_path, | ||
3025 | const struct GNUNET_PeerIdentity * const *put_path, | ||
3026 | enum GNUNET_BLOCK_Type type, | ||
3027 | size_t size, | ||
3028 | const void *data); | ||
3029 | |||
3030 | |||
3031 | /** | ||
3032 | * We're processing a GET request and have decided | ||
3033 | * to forward it to other peers. This function is called periodically | ||
3034 | * and should forward the request to other peers until we have all | ||
3035 | * possible replies. If we have transmitted the *only* reply to | ||
3036 | * the initiator we should destroy the pending request. If we have | ||
3037 | * many replies in the queue to the initiator, we should delay sending | ||
3038 | * out more queries until the reply queue has shrunk some. | ||
3039 | * | ||
3040 | * @param cls our "struct ProcessGetContext *" | ||
3041 | * @param tc unused | ||
3042 | */ | ||
3043 | static void | ||
3044 | forward_request_task (void *cls, | ||
3045 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
3046 | { | ||
3047 | struct PendingRequest *pr = cls; | ||
3048 | struct PeerSelectionContext psc; | ||
3049 | struct ConnectedPeer *cp; | ||
3050 | struct GNUNET_TIME_Relative delay; | ||
3051 | |||
3052 | pr->task = GNUNET_SCHEDULER_NO_TASK; | ||
3053 | if (pr->pirc != NULL) | ||
3054 | { | ||
3055 | #if DEBUG_FS | ||
3056 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3057 | "Forwarding of query `%s' not attempted due to pending local lookup!\n", | ||
3058 | GNUNET_h2s (&pr->query)); | ||
3059 | #endif | ||
3060 | return; /* already pending */ | ||
3061 | } | ||
3062 | if (GNUNET_YES == pr->local_only) | ||
3063 | return; /* configured to not do P2P search */ | ||
3064 | /* (0) try DHT */ | ||
3065 | if ( (0 == pr->anonymity_level) && | ||
3066 | (GNUNET_YES != pr->forward_only) && | ||
3067 | (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) && | ||
3068 | (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) ) | ||
3069 | { | ||
3070 | pr->dht_get = GNUNET_DHT_get_start (dht_handle, | ||
3071 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3072 | pr->type, | ||
3073 | &pr->query, | ||
3074 | DEFAULT_GET_REPLICATION, | ||
3075 | GNUNET_DHT_RO_NONE, | ||
3076 | pr->bf, | ||
3077 | pr->mingle, | ||
3078 | pr->namespace, | ||
3079 | (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, | ||
3080 | &process_dht_reply, | ||
3081 | pr); | ||
3082 | } | ||
3083 | |||
3084 | if ( (pr->anonymity_level > 1) && | ||
3085 | (cover_query_count < pr->anonymity_level - 1) ) | ||
3086 | { | ||
3087 | delay = get_processing_delay (); | ||
3088 | #if DEBUG_FS | ||
3089 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3090 | "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n", | ||
3091 | GNUNET_h2s (&pr->query), | ||
3092 | delay.rel_value); | ||
3093 | #endif | ||
3094 | pr->task = GNUNET_SCHEDULER_add_delayed (delay, | ||
3095 | &forward_request_task, | ||
3096 | pr); | ||
3097 | return; | ||
3098 | } | ||
3099 | /* consume cover traffic */ | ||
3100 | if (pr->anonymity_level > 1) | ||
3101 | cover_query_count -= pr->anonymity_level - 1; | ||
3102 | |||
3103 | /* (1) select target */ | ||
3104 | psc.pr = pr; | ||
3105 | psc.target_score = -DBL_MAX; | ||
3106 | psc.fast_retry = GNUNET_NO; | ||
3107 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | ||
3108 | &target_peer_select_cb, | ||
3109 | &psc); | ||
3110 | if (psc.target_score == -DBL_MAX) | ||
3111 | { | ||
3112 | if (psc.fast_retry == GNUNET_YES) | ||
3113 | delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */ | ||
3114 | else | ||
3115 | delay = get_processing_delay (); | ||
3116 | #if DEBUG_FS | ||
3117 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3118 | "No peer selected for forwarding of query `%s', will try again in %llu ms!\n", | ||
3119 | GNUNET_h2s (&pr->query), | ||
3120 | delay.rel_value); | ||
3121 | #endif | ||
3122 | pr->task = GNUNET_SCHEDULER_add_delayed (delay, | ||
3123 | &forward_request_task, | ||
3124 | pr); | ||
3125 | return; /* nobody selected */ | ||
3126 | } | ||
3127 | /* (3) update TTL/priority */ | ||
3128 | if (pr->client_request_list != NULL) | ||
3129 | { | ||
3130 | /* FIXME: use better algorithm!? */ | ||
3131 | if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
3132 | 4)) | ||
3133 | pr->priority++; | ||
3134 | /* bound priority we use by priorities we see from other peers | ||
3135 | rounded up (must round up so that we can see non-zero | ||
3136 | priorities, but round up as little as possible to make it | ||
3137 | plausible that we forwarded another peers request) */ | ||
3138 | if (pr->priority > current_priorities + 1.0) | ||
3139 | pr->priority = (uint32_t) current_priorities + 1.0; | ||
3140 | pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2, | ||
3141 | pr->priority); | ||
3142 | #if DEBUG_FS | ||
3143 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3144 | "Trying query `%s' with priority %u and TTL %d.\n", | ||
3145 | GNUNET_h2s (&pr->query), | ||
3146 | pr->priority, | ||
3147 | pr->ttl); | ||
3148 | #endif | ||
3149 | } | ||
3150 | |||
3151 | /* (3) reserve reply bandwidth */ | ||
3152 | if (GNUNET_NO == pr->forward_only) | ||
3153 | { | ||
3154 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
3155 | &psc.target.hashPubKey); | ||
3156 | GNUNET_assert (NULL != cp); | ||
3157 | GNUNET_assert (cp->irc == NULL); | ||
3158 | pr->pirc = cp; | ||
3159 | cp->pr = pr; | ||
3160 | #if DEBUG_FS | ||
3161 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3162 | "Asking core for bandwidth for query `%s'.\n", | ||
3163 | GNUNET_h2s (&pr->query)); | ||
3164 | #endif | ||
3165 | cp->irc = GNUNET_CORE_peer_change_preference (core, | ||
3166 | &psc.target, | ||
3167 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
3168 | GNUNET_BANDWIDTH_value_init (UINT32_MAX), | ||
3169 | DBLOCK_SIZE * 2, | ||
3170 | cp->inc_preference, | ||
3171 | &target_reservation_cb, | ||
3172 | pr); | ||
3173 | GNUNET_assert (cp->irc != NULL); | ||
3174 | cp->inc_preference = 0; | ||
3175 | } | ||
3176 | else | ||
3177 | { | ||
3178 | /* force forwarding */ | ||
3179 | static struct GNUNET_BANDWIDTH_Value32NBO zerobw; | ||
3180 | target_reservation_cb (pr, &psc.target, | ||
3181 | zerobw, 0, | ||
3182 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3183 | 0.0); | ||
3184 | } | ||
3185 | } | ||
3186 | |||
3187 | |||
3188 | /* **************************** P2P PUT Handling ************************ */ | ||
3189 | |||
3190 | |||
3191 | /** | ||
3192 | * Function called after we either failed or succeeded | ||
3193 | * at transmitting a reply to a peer. | ||
3194 | * | ||
3195 | * @param cls the requests "struct PendingRequest*" | ||
3196 | * @param tpid ID of receiving peer, 0 on transmission error | ||
3197 | */ | ||
3198 | static void | ||
3199 | transmit_reply_continuation (void *cls, | ||
3200 | GNUNET_PEER_Id tpid) | ||
3201 | { | ||
3202 | struct PendingRequest *pr = cls; | ||
3203 | |||
3204 | switch (pr->type) | ||
3205 | { | ||
3206 | case GNUNET_BLOCK_TYPE_FS_DBLOCK: | ||
3207 | case GNUNET_BLOCK_TYPE_FS_IBLOCK: | ||
3208 | /* only one reply expected, done with the request! */ | ||
3209 | destroy_pending_request (pr); | ||
3210 | break; | ||
3211 | case GNUNET_BLOCK_TYPE_ANY: | ||
3212 | case GNUNET_BLOCK_TYPE_FS_KBLOCK: | ||
3213 | case GNUNET_BLOCK_TYPE_FS_SBLOCK: | ||
3214 | break; | ||
3215 | default: | ||
3216 | GNUNET_break (0); | ||
3217 | break; | ||
3218 | } | ||
3219 | } | ||
3220 | |||
3221 | |||
3222 | /** | ||
3223 | * Transmit the given message by copying it to the target buffer | ||
3224 | * "buf". "buf" will be NULL and "size" zero if the socket was closed | ||
3225 | * for writing in the meantime. In that case, do nothing | ||
3226 | * (the disconnect or shutdown handler will take care of the rest). | ||
3227 | * If we were able to transmit messages and there are still more | ||
3228 | * pending, ask core again for further calls to this function. | ||
3229 | * | ||
3230 | * @param cls closure, pointer to the 'struct ClientList*' | ||
3231 | * @param size number of bytes available in buf | ||
3232 | * @param buf where the callee should write the message | ||
3233 | * @return number of bytes written to buf | ||
3234 | */ | ||
3235 | static size_t | ||
3236 | transmit_to_client (void *cls, | ||
3237 | size_t size, void *buf) | ||
3238 | { | ||
3239 | struct ClientList *cl = cls; | ||
3240 | char *cbuf = buf; | ||
3241 | struct ClientResponseMessage *creply; | ||
3242 | size_t msize; | ||
3243 | |||
3244 | cl->th = NULL; | ||
3245 | if (NULL == buf) | ||
3246 | { | ||
3247 | #if DEBUG_FS | ||
3248 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3249 | "Not sending reply, client communication problem.\n"); | ||
3250 | #endif | ||
3251 | return 0; | ||
3252 | } | ||
3253 | msize = 0; | ||
3254 | while ( (NULL != (creply = cl->res_head) ) && | ||
3255 | (creply->msize <= size) ) | ||
3256 | { | ||
3257 | memcpy (&cbuf[msize], &creply[1], creply->msize); | ||
3258 | msize += creply->msize; | ||
3259 | size -= creply->msize; | ||
3260 | GNUNET_CONTAINER_DLL_remove (cl->res_head, | ||
3261 | cl->res_tail, | ||
3262 | creply); | ||
3263 | GNUNET_free (creply); | ||
3264 | } | ||
3265 | if (NULL != creply) | ||
3266 | cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, | ||
3267 | creply->msize, | ||
3268 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3269 | &transmit_to_client, | ||
3270 | cl); | ||
3271 | #if DEBUG_FS | ||
3272 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3273 | "Transmitted %u bytes to client\n", | ||
3274 | (unsigned int) msize); | ||
3275 | #endif | ||
3276 | return msize; | ||
3277 | } | ||
3278 | |||
3279 | |||
3280 | /** | ||
3281 | * Closure for "process_reply" function. | ||
3282 | */ | ||
3283 | struct ProcessReplyClosure | ||
3284 | { | ||
3285 | /** | ||
3286 | * The data for the reply. | ||
3287 | */ | ||
3288 | const void *data; | ||
3289 | |||
3290 | /** | ||
3291 | * Who gave us this reply? NULL for local host (or DHT) | ||
3292 | */ | ||
3293 | struct ConnectedPeer *sender; | ||
3294 | |||
3295 | /** | ||
3296 | * When the reply expires. | ||
3297 | */ | ||
3298 | struct GNUNET_TIME_Absolute expiration; | ||
3299 | |||
3300 | /** | ||
3301 | * Size of data. | ||
3302 | */ | ||
3303 | size_t size; | ||
3304 | |||
3305 | /** | ||
3306 | * Type of the block. | ||
3307 | */ | ||
3308 | enum GNUNET_BLOCK_Type type; | ||
3309 | |||
3310 | /** | ||
3311 | * How much was this reply worth to us? | ||
3312 | */ | ||
3313 | uint32_t priority; | ||
3314 | |||
3315 | /** | ||
3316 | * Anonymity requirements for this reply. | ||
3317 | */ | ||
3318 | uint32_t anonymity_level; | ||
3319 | |||
3320 | /** | ||
3321 | * Evaluation result (returned). | ||
3322 | */ | ||
3323 | enum GNUNET_BLOCK_EvaluationResult eval; | ||
3324 | |||
3325 | /** | ||
3326 | * Did we finish processing the associated request? | ||
3327 | */ | ||
3328 | int finished; | ||
3329 | |||
3330 | /** | ||
3331 | * Did we find a matching request? | ||
3332 | */ | ||
3333 | int request_found; | ||
3334 | }; | ||
3335 | |||
3336 | |||
3337 | /** | 213 | /** |
3338 | * We have received a reply; handle it! | 214 | * Handle P2P "PUT" message. |
3339 | * | 215 | * |
3340 | * @param cls response (struct ProcessReplyClosure) | 216 | * @param cls closure, always NULL |
3341 | * @param key our query | 217 | * @param other the other peer involved (sender or receiver, NULL |
3342 | * @param value value in the hash map (info about the query) | 218 | * for loopback messages where we are both sender and receiver) |
3343 | * @return GNUNET_YES (we should continue to iterate) | 219 | * @param message the actual message |
220 | * @param atsi performance information | ||
221 | * @return GNUNET_OK to keep the connection open, | ||
222 | * GNUNET_SYSERR to close it (signal serious error) | ||
3344 | */ | 223 | */ |
3345 | static int | 224 | static int |
3346 | process_reply (void *cls, | 225 | handle_p2p_put (void *cls, |
3347 | const GNUNET_HashCode * key, | 226 | const struct GNUNET_PeerIdentity *other, |
3348 | void *value) | 227 | const struct GNUNET_MessageHeader *message, |
228 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
3349 | { | 229 | { |
3350 | struct ProcessReplyClosure *prq = cls; | 230 | struct GSF_ConnectedPeer *cp; |
3351 | struct PendingRequest *pr = value; | ||
3352 | struct PendingMessage *reply; | ||
3353 | struct ClientResponseMessage *creply; | ||
3354 | struct ClientList *cl; | ||
3355 | struct PutMessage *pm; | ||
3356 | struct ConnectedPeer *cp; | ||
3357 | struct GNUNET_TIME_Relative cur_delay; | ||
3358 | #if SUPPORT_DELAYS | ||
3359 | struct GNUNET_TIME_Relative art_delay; | ||
3360 | #endif | ||
3361 | size_t msize; | ||
3362 | unsigned int i; | ||
3363 | 231 | ||
3364 | if (NULL == pr->client_request_list) | 232 | cp = GSF_peer_get_ (other); |
3365 | { | 233 | if (NULL == cp) |
3366 | /* reply will go over the network, check for cover traffic */ | ||
3367 | if ( (prq->anonymity_level > 1) && | ||
3368 | (cover_content_count < prq->anonymity_level - 1) ) | ||
3369 | { | ||
3370 | /* insufficient cover traffic, skip */ | ||
3371 | GNUNET_STATISTICS_update (stats, | ||
3372 | gettext_noop ("# replies suppressed due to lack of cover traffic"), | ||
3373 | 1, | ||
3374 | GNUNET_NO); | ||
3375 | return GNUNET_YES; | ||
3376 | } | ||
3377 | if (prq->anonymity_level > 1) | ||
3378 | cover_content_count -= prq->anonymity_level - 1; | ||
3379 | } | ||
3380 | #if DEBUG_FS | ||
3381 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3382 | "Matched result (type %u) for query `%s' with pending request\n", | ||
3383 | (unsigned int) prq->type, | ||
3384 | GNUNET_h2s (key)); | ||
3385 | #endif | ||
3386 | GNUNET_STATISTICS_update (stats, | ||
3387 | gettext_noop ("# replies received and matched"), | ||
3388 | 1, | ||
3389 | GNUNET_NO); | ||
3390 | if (prq->sender != NULL) | ||
3391 | { | 234 | { |
3392 | for (i=0;i<pr->used_targets_off;i++) | ||
3393 | if (pr->used_targets[i].pid == prq->sender->pid) | ||
3394 | break; | ||
3395 | if (i < pr->used_targets_off) | ||
3396 | { | ||
3397 | cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time); | ||
3398 | prq->sender->avg_delay.rel_value | ||
3399 | = (prq->sender->avg_delay.rel_value * | ||
3400 | (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; | ||
3401 | prq->sender->avg_priority | ||
3402 | = (prq->sender->avg_priority * | ||
3403 | (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N; | ||
3404 | } | ||
3405 | if (pr->cp != NULL) | ||
3406 | { | ||
3407 | GNUNET_PEER_change_rc (prq->sender->last_p2p_replies | ||
3408 | [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], | ||
3409 | -1); | ||
3410 | GNUNET_PEER_change_rc (pr->cp->pid, 1); | ||
3411 | prq->sender->last_p2p_replies | ||
3412 | [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE] | ||
3413 | = pr->cp->pid; | ||
3414 | } | ||
3415 | else | ||
3416 | { | ||
3417 | if (NULL != prq->sender->last_client_replies | ||
3418 | [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]) | ||
3419 | GNUNET_SERVER_client_drop (prq->sender->last_client_replies | ||
3420 | [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]); | ||
3421 | prq->sender->last_client_replies | ||
3422 | [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE] | ||
3423 | = pr->client_request_list->client_list->client; | ||
3424 | GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client); | ||
3425 | } | ||
3426 | } | ||
3427 | prq->eval = GNUNET_BLOCK_evaluate (block_ctx, | ||
3428 | prq->type, | ||
3429 | key, | ||
3430 | &pr->bf, | ||
3431 | pr->mingle, | ||
3432 | pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, | ||
3433 | prq->data, | ||
3434 | prq->size); | ||
3435 | switch (prq->eval) | ||
3436 | { | ||
3437 | case GNUNET_BLOCK_EVALUATION_OK_MORE: | ||
3438 | break; | ||
3439 | case GNUNET_BLOCK_EVALUATION_OK_LAST: | ||
3440 | while (NULL != pr->pending_head) | ||
3441 | destroy_pending_message_list_entry (pr->pending_head); | ||
3442 | if (pr->qe != NULL) | ||
3443 | { | ||
3444 | if (pr->client_request_list != NULL) | ||
3445 | GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, | ||
3446 | GNUNET_YES); | ||
3447 | GNUNET_DATASTORE_cancel (pr->qe); | ||
3448 | pr->qe = NULL; | ||
3449 | } | ||
3450 | pr->do_remove = GNUNET_YES; | ||
3451 | if (pr->task != GNUNET_SCHEDULER_NO_TASK) | ||
3452 | { | ||
3453 | GNUNET_SCHEDULER_cancel (pr->task); | ||
3454 | pr->task = GNUNET_SCHEDULER_NO_TASK; | ||
3455 | } | ||
3456 | GNUNET_break (GNUNET_YES == | ||
3457 | GNUNET_CONTAINER_multihashmap_remove (query_request_map, | ||
3458 | key, | ||
3459 | pr)); | ||
3460 | GNUNET_LOAD_update (rt_entry_lifetime, | ||
3461 | GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value); | ||
3462 | break; | ||
3463 | case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: | ||
3464 | GNUNET_STATISTICS_update (stats, | ||
3465 | gettext_noop ("# duplicate replies discarded (bloomfilter)"), | ||
3466 | 1, | ||
3467 | GNUNET_NO); | ||
3468 | #if DEBUG_FS | ||
3469 | /* GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3470 | "Duplicate response `%s', discarding.\n", | ||
3471 | GNUNET_h2s (&mhash));*/ | ||
3472 | #endif | ||
3473 | return GNUNET_YES; /* duplicate */ | ||
3474 | case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: | ||
3475 | return GNUNET_YES; /* wrong namespace */ | ||
3476 | case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: | ||
3477 | GNUNET_break (0); | ||
3478 | return GNUNET_YES; | ||
3479 | case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: | ||
3480 | GNUNET_break (0); | 235 | GNUNET_break (0); |
3481 | return GNUNET_YES; | 236 | return GNUNET_OK; |
3482 | case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: | ||
3483 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
3484 | _("Unsupported block type %u\n"), | ||
3485 | prq->type); | ||
3486 | return GNUNET_NO; | ||
3487 | } | ||
3488 | if (pr->client_request_list != NULL) | ||
3489 | { | ||
3490 | if (pr->replies_seen_size == pr->replies_seen_off) | ||
3491 | GNUNET_array_grow (pr->replies_seen, | ||
3492 | pr->replies_seen_size, | ||
3493 | pr->replies_seen_size * 2 + 4); | ||
3494 | GNUNET_CRYPTO_hash (prq->data, | ||
3495 | prq->size, | ||
3496 | &pr->replies_seen[pr->replies_seen_off++]); | ||
3497 | refresh_bloomfilter (pr); | ||
3498 | } | ||
3499 | if (NULL == prq->sender) | ||
3500 | { | ||
3501 | #if DEBUG_FS | ||
3502 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3503 | "Found result for query `%s' in local datastore\n", | ||
3504 | GNUNET_h2s (key)); | ||
3505 | #endif | ||
3506 | GNUNET_STATISTICS_update (stats, | ||
3507 | gettext_noop ("# results found locally"), | ||
3508 | 1, | ||
3509 | GNUNET_NO); | ||
3510 | } | ||
3511 | prq->priority += pr->remaining_priority; | ||
3512 | pr->remaining_priority = 0; | ||
3513 | pr->results_found++; | ||
3514 | prq->request_found = GNUNET_YES; | ||
3515 | if (NULL != pr->client_request_list) | ||
3516 | { | ||
3517 | GNUNET_STATISTICS_update (stats, | ||
3518 | gettext_noop ("# replies received for local clients"), | ||
3519 | 1, | ||
3520 | GNUNET_NO); | ||
3521 | cl = pr->client_request_list->client_list; | ||
3522 | msize = sizeof (struct PutMessage) + prq->size; | ||
3523 | creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage)); | ||
3524 | creply->msize = msize; | ||
3525 | creply->client_list = cl; | ||
3526 | GNUNET_CONTAINER_DLL_insert_after (cl->res_head, | ||
3527 | cl->res_tail, | ||
3528 | cl->res_tail, | ||
3529 | creply); | ||
3530 | pm = (struct PutMessage*) &creply[1]; | ||
3531 | pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | ||
3532 | pm->header.size = htons (msize); | ||
3533 | pm->type = htonl (prq->type); | ||
3534 | pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); | ||
3535 | memcpy (&pm[1], prq->data, prq->size); | ||
3536 | if (NULL == cl->th) | ||
3537 | { | ||
3538 | #if DEBUG_FS | ||
3539 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3540 | "Transmitting result for query `%s' to client\n", | ||
3541 | GNUNET_h2s (key)); | ||
3542 | #endif | ||
3543 | cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, | ||
3544 | msize, | ||
3545 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3546 | &transmit_to_client, | ||
3547 | cl); | ||
3548 | } | ||
3549 | GNUNET_break (cl->th != NULL); | ||
3550 | if (pr->do_remove) | ||
3551 | { | ||
3552 | prq->finished = GNUNET_YES; | ||
3553 | destroy_pending_request (pr); | ||
3554 | } | ||
3555 | } | ||
3556 | else | ||
3557 | { | ||
3558 | cp = pr->cp; | ||
3559 | #if DEBUG_FS | ||
3560 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3561 | "Transmitting result for query `%s' to other peer (PID=%u)\n", | ||
3562 | GNUNET_h2s (key), | ||
3563 | (unsigned int) cp->pid); | ||
3564 | #endif | ||
3565 | GNUNET_STATISTICS_update (stats, | ||
3566 | gettext_noop ("# replies received for other peers"), | ||
3567 | 1, | ||
3568 | GNUNET_NO); | ||
3569 | msize = sizeof (struct PutMessage) + prq->size; | ||
3570 | reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); | ||
3571 | reply->cont = &transmit_reply_continuation; | ||
3572 | reply->cont_cls = pr; | ||
3573 | #if SUPPORT_DELAYS | ||
3574 | art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, | ||
3575 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
3576 | TTL_DECREMENT)); | ||
3577 | reply->delay_until | ||
3578 | = GNUNET_TIME_relative_to_absolute (art_delay); | ||
3579 | GNUNET_STATISTICS_update (stats, | ||
3580 | gettext_noop ("cummulative artificial delay introduced (ms)"), | ||
3581 | art_delay.abs_value, | ||
3582 | GNUNET_NO); | ||
3583 | #endif | ||
3584 | reply->msize = msize; | ||
3585 | reply->priority = UINT32_MAX; /* send replies first! */ | ||
3586 | pm = (struct PutMessage*) &reply[1]; | ||
3587 | pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | ||
3588 | pm->header.size = htons (msize); | ||
3589 | pm->type = htonl (prq->type); | ||
3590 | pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); | ||
3591 | memcpy (&pm[1], prq->data, prq->size); | ||
3592 | add_to_pending_messages_for_peer (cp, reply, pr); | ||
3593 | } | 237 | } |
3594 | return GNUNET_YES; | 238 | return GSF_handle_p2p_content_ (cp, message); |
3595 | } | 239 | } |
3596 | 240 | ||
3597 | 241 | ||
3598 | /** | 242 | /** |
3599 | * Iterator called on each result obtained for a DHT | 243 | * We have a new request, consider forwarding it to the given |
3600 | * operation that expects a reply | 244 | * peer. |
3601 | * | 245 | * |
3602 | * @param cls closure | 246 | * @param cls the 'struct GSF_PendingRequest' |
3603 | * @param exp when will this value expire | 247 | * @param peer identity of the peer |
3604 | * @param key key of the result | 248 | * @param cp handle to the connected peer record |
3605 | * @param get_path NULL-terminated array of pointers | 249 | * @param ppd peer performance data |
3606 | * to the peers on reverse GET path (or NULL if not recorded) | ||
3607 | * @param put_path NULL-terminated array of pointers | ||
3608 | * to the peers on the PUT path (or NULL if not recorded) | ||
3609 | * @param type type of the result | ||
3610 | * @param size number of bytes in data | ||
3611 | * @param data pointer to the result data | ||
3612 | */ | 250 | */ |
3613 | static void | 251 | static void |
3614 | process_dht_reply (void *cls, | 252 | consider_request_for_forwarding (void *cls, |
3615 | struct GNUNET_TIME_Absolute exp, | 253 | const struct GNUNET_PeerIdentity *peer, |
3616 | const GNUNET_HashCode * key, | 254 | struct GSF_ConnectedPeer *cp, |
3617 | const struct GNUNET_PeerIdentity * const *get_path, | 255 | const struct GSF_PeerPerformanceData *ppd) |
3618 | const struct GNUNET_PeerIdentity * const *put_path, | ||
3619 | enum GNUNET_BLOCK_Type type, | ||
3620 | size_t size, | ||
3621 | const void *data) | ||
3622 | { | 256 | { |
3623 | struct PendingRequest *pr = cls; | 257 | struct GSF_PendingRequest *pr = cls; |
3624 | struct ProcessReplyClosure prq; | ||
3625 | 258 | ||
3626 | memset (&prq, 0, sizeof (prq)); | 259 | GSF_plan_add_ (cp, pr); |
3627 | prq.data = data; | ||
3628 | prq.expiration = exp; | ||
3629 | prq.size = size; | ||
3630 | prq.type = type; | ||
3631 | process_reply (&prq, key, pr); | ||
3632 | } | 260 | } |
3633 | 261 | ||
3634 | 262 | ||
3635 | |||
3636 | /** | 263 | /** |
3637 | * Continuation called to notify client about result of the | 264 | * Function to be called after we're done processing |
3638 | * operation. | 265 | * replies from the local lookup. If the result status |
266 | * code indicates that there may be more replies, plan | ||
267 | * forwarding the request. | ||
3639 | * | 268 | * |
3640 | * @param cls closure | 269 | * @param cls closure (NULL) |
3641 | * @param success GNUNET_SYSERR on failure | 270 | * @param pr the pending request we were processing |
3642 | * @param msg NULL on success, otherwise an error message | 271 | * @param result final datastore lookup result |
3643 | */ | 272 | */ |
3644 | static void | 273 | static void |
3645 | put_migration_continuation (void *cls, | 274 | consider_forwarding (void *cls, |
3646 | int success, | 275 | struct GSF_PendingRequest *pr, |
3647 | const char *msg) | 276 | enum GNUNET_BLOCK_EvaluationResult result) |
3648 | { | 277 | { |
3649 | struct GNUNET_TIME_Absolute *start = cls; | 278 | if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) |
3650 | struct GNUNET_TIME_Relative delay; | 279 | return; /* we're done... */ |
3651 | 280 | GSF_iterate_connected_peers_ (&consider_request_for_forwarding, | |
3652 | delay = GNUNET_TIME_absolute_get_duration (*start); | 281 | pr); |
3653 | GNUNET_free (start); | ||
3654 | GNUNET_LOAD_update (datastore_put_load, | ||
3655 | delay.rel_value); | ||
3656 | if (GNUNET_OK == success) | ||
3657 | return; | ||
3658 | GNUNET_STATISTICS_update (stats, | ||
3659 | gettext_noop ("# datastore 'put' failures"), | ||
3660 | 1, | ||
3661 | GNUNET_NO); | ||
3662 | } | 282 | } |
3663 | 283 | ||
3664 | 284 | ||
3665 | /** | 285 | /** |
3666 | * Handle P2P "PUT" message. | 286 | * Handle P2P "GET" request. |
3667 | * | 287 | * |
3668 | * @param cls closure, always NULL | 288 | * @param cls closure, always NULL |
3669 | * @param other the other peer involved (sender or receiver, NULL | 289 | * @param other the other peer involved (sender or receiver, NULL |
@@ -3674,963 +294,176 @@ put_migration_continuation (void *cls, | |||
3674 | * GNUNET_SYSERR to close it (signal serious error) | 294 | * GNUNET_SYSERR to close it (signal serious error) |
3675 | */ | 295 | */ |
3676 | static int | 296 | static int |
3677 | handle_p2p_put (void *cls, | 297 | handle_p2p_get (void *cls, |
3678 | const struct GNUNET_PeerIdentity *other, | 298 | const struct GNUNET_PeerIdentity *other, |
3679 | const struct GNUNET_MessageHeader *message, | 299 | const struct GNUNET_MessageHeader *message, |
3680 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | 300 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) |
3681 | { | 301 | { |
3682 | const struct PutMessage *put; | 302 | struct GSF_PendingRequest *pr; |
3683 | uint16_t msize; | ||
3684 | size_t dsize; | ||
3685 | enum GNUNET_BLOCK_Type type; | ||
3686 | struct GNUNET_TIME_Absolute expiration; | ||
3687 | GNUNET_HashCode query; | ||
3688 | struct ProcessReplyClosure prq; | ||
3689 | struct GNUNET_TIME_Absolute *start; | ||
3690 | struct GNUNET_TIME_Relative block_time; | ||
3691 | double putl; | ||
3692 | struct ConnectedPeer *cp; | ||
3693 | struct PendingMessage *pm; | ||
3694 | struct MigrationStopMessage *msm; | ||
3695 | |||
3696 | msize = ntohs (message->size); | ||
3697 | if (msize < sizeof (struct PutMessage)) | ||
3698 | { | ||
3699 | GNUNET_break_op(0); | ||
3700 | return GNUNET_SYSERR; | ||
3701 | } | ||
3702 | put = (const struct PutMessage*) message; | ||
3703 | dsize = msize - sizeof (struct PutMessage); | ||
3704 | type = ntohl (put->type); | ||
3705 | expiration = GNUNET_TIME_absolute_ntoh (put->expiration); | ||
3706 | 303 | ||
3707 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | 304 | pr = GSF_handle_p2p_query_ (other, message); |
305 | if (NULL == pr) | ||
3708 | return GNUNET_SYSERR; | 306 | return GNUNET_SYSERR; |
3709 | if (GNUNET_OK != | 307 | GSF_local_lookup_ (pr, |
3710 | GNUNET_BLOCK_get_key (block_ctx, | 308 | &consider_forwarding, |
3711 | type, | 309 | NULL); |
3712 | &put[1], | ||
3713 | dsize, | ||
3714 | &query)) | ||
3715 | { | ||
3716 | GNUNET_break_op (0); | ||
3717 | return GNUNET_SYSERR; | ||
3718 | } | ||
3719 | cover_content_count++; | ||
3720 | #if DEBUG_FS | ||
3721 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3722 | "Received result for query `%s' from peer `%4s'\n", | ||
3723 | GNUNET_h2s (&query), | ||
3724 | GNUNET_i2s (other)); | ||
3725 | #endif | ||
3726 | GNUNET_STATISTICS_update (stats, | ||
3727 | gettext_noop ("# replies received (overall)"), | ||
3728 | 1, | ||
3729 | GNUNET_NO); | ||
3730 | /* now, lookup 'query' */ | ||
3731 | prq.data = (const void*) &put[1]; | ||
3732 | if (other != NULL) | ||
3733 | prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
3734 | &other->hashPubKey); | ||
3735 | else | ||
3736 | prq.sender = NULL; | ||
3737 | prq.size = dsize; | ||
3738 | prq.type = type; | ||
3739 | prq.expiration = expiration; | ||
3740 | prq.priority = 0; | ||
3741 | prq.anonymity_level = 1; | ||
3742 | prq.finished = GNUNET_NO; | ||
3743 | prq.request_found = GNUNET_NO; | ||
3744 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, | ||
3745 | &query, | ||
3746 | &process_reply, | ||
3747 | &prq); | ||
3748 | if (prq.sender != NULL) | ||
3749 | { | ||
3750 | prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority; | ||
3751 | change_host_trust (prq.sender, prq.priority); | ||
3752 | } | ||
3753 | if ( (GNUNET_YES == active_to_migration) && | ||
3754 | (GNUNET_NO == test_put_load_too_high (prq.priority)) ) | ||
3755 | { | ||
3756 | #if DEBUG_FS | ||
3757 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3758 | "Replicating result for query `%s' with priority %u\n", | ||
3759 | GNUNET_h2s (&query), | ||
3760 | prq.priority); | ||
3761 | #endif | ||
3762 | start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); | ||
3763 | *start = GNUNET_TIME_absolute_get (); | ||
3764 | GNUNET_DATASTORE_put (dsh, | ||
3765 | 0, &query, dsize, &put[1], | ||
3766 | type, prq.priority, 1 /* anonymity */, | ||
3767 | 0 /* replication */, | ||
3768 | expiration, | ||
3769 | 1 + prq.priority, | ||
3770 | MAX_DATASTORE_QUEUE, | ||
3771 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
3772 | &put_migration_continuation, | ||
3773 | start); | ||
3774 | } | ||
3775 | putl = GNUNET_LOAD_get_load (datastore_put_load); | ||
3776 | if ( (NULL != (cp = prq.sender)) && | ||
3777 | (GNUNET_NO == prq.request_found) && | ||
3778 | ( (GNUNET_YES != active_to_migration) || | ||
3779 | (putl > 2.5 * (1 + prq.priority)) ) ) | ||
3780 | { | ||
3781 | if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000) | ||
3782 | return GNUNET_OK; /* already blocked */ | ||
3783 | /* We're too busy; send MigrationStop message! */ | ||
3784 | if (GNUNET_YES != active_to_migration) | ||
3785 | putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); | ||
3786 | block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, | ||
3787 | 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
3788 | (unsigned int) (60000 * putl * putl))); | ||
3789 | |||
3790 | cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); | ||
3791 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + | ||
3792 | sizeof (struct MigrationStopMessage)); | ||
3793 | pm->msize = sizeof (struct MigrationStopMessage); | ||
3794 | pm->priority = UINT32_MAX; | ||
3795 | msm = (struct MigrationStopMessage*) &pm[1]; | ||
3796 | msm->header.size = htons (sizeof (struct MigrationStopMessage)); | ||
3797 | msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); | ||
3798 | msm->duration = GNUNET_TIME_relative_hton (block_time); | ||
3799 | add_to_pending_messages_for_peer (cp, | ||
3800 | pm, | ||
3801 | NULL); | ||
3802 | } | ||
3803 | return GNUNET_OK; | ||
3804 | } | ||
3805 | |||
3806 | |||
3807 | /** | ||
3808 | * Handle P2P "MIGRATION_STOP" message. | ||
3809 | * | ||
3810 | * @param cls closure, always NULL | ||
3811 | * @param other the other peer involved (sender or receiver, NULL | ||
3812 | * for loopback messages where we are both sender and receiver) | ||
3813 | * @param message the actual message | ||
3814 | * @param atsi performance information | ||
3815 | * @return GNUNET_OK to keep the connection open, | ||
3816 | * GNUNET_SYSERR to close it (signal serious error) | ||
3817 | */ | ||
3818 | static int | ||
3819 | handle_p2p_migration_stop (void *cls, | ||
3820 | const struct GNUNET_PeerIdentity *other, | ||
3821 | const struct GNUNET_MessageHeader *message, | ||
3822 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
3823 | { | ||
3824 | struct ConnectedPeer *cp; | ||
3825 | const struct MigrationStopMessage *msm; | ||
3826 | |||
3827 | msm = (const struct MigrationStopMessage*) message; | ||
3828 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
3829 | &other->hashPubKey); | ||
3830 | if (cp == NULL) | ||
3831 | { | ||
3832 | GNUNET_break (0); | ||
3833 | return GNUNET_OK; | ||
3834 | } | ||
3835 | cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); | ||
3836 | return GNUNET_OK; | 310 | return GNUNET_OK; |
3837 | } | 311 | } |
3838 | 312 | ||
3839 | 313 | ||
3840 | |||
3841 | /* **************************** P2P GET Handling ************************ */ | ||
3842 | |||
3843 | |||
3844 | /** | ||
3845 | * Closure for 'check_duplicate_request_{peer,client}'. | ||
3846 | */ | ||
3847 | struct CheckDuplicateRequestClosure | ||
3848 | { | ||
3849 | /** | ||
3850 | * The new request we should check if it already exists. | ||
3851 | */ | ||
3852 | const struct PendingRequest *pr; | ||
3853 | |||
3854 | /** | ||
3855 | * Existing request found by the checker, NULL if none. | ||
3856 | */ | ||
3857 | struct PendingRequest *have; | ||
3858 | }; | ||
3859 | |||
3860 | |||
3861 | /** | ||
3862 | * Iterator over entries in the 'query_request_map' that | ||
3863 | * tries to see if we have the same request pending from | ||
3864 | * the same client already. | ||
3865 | * | ||
3866 | * @param cls closure (our 'struct CheckDuplicateRequestClosure') | ||
3867 | * @param key current key code (query, ignored, must match) | ||
3868 | * @param value value in the hash map (a 'struct PendingRequest' | ||
3869 | * that already exists) | ||
3870 | * @return GNUNET_YES if we should continue to | ||
3871 | * iterate (no match yet) | ||
3872 | * GNUNET_NO if not (match found). | ||
3873 | */ | ||
3874 | static int | ||
3875 | check_duplicate_request_client (void *cls, | ||
3876 | const GNUNET_HashCode * key, | ||
3877 | void *value) | ||
3878 | { | ||
3879 | struct CheckDuplicateRequestClosure *cdc = cls; | ||
3880 | struct PendingRequest *have = value; | ||
3881 | |||
3882 | if (have->client_request_list == NULL) | ||
3883 | return GNUNET_YES; | ||
3884 | if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) && | ||
3885 | (cdc->pr != have) ) | ||
3886 | { | ||
3887 | cdc->have = have; | ||
3888 | return GNUNET_NO; | ||
3889 | } | ||
3890 | return GNUNET_YES; | ||
3891 | } | ||
3892 | |||
3893 | |||
3894 | /** | 314 | /** |
3895 | * We're processing (local) results for a search request | 315 | * We're done with the local lookup, now consider |
3896 | * from another peer. Pass applicable results to the | 316 | * P2P processing (depending on request options and |
3897 | * peer and if we are done either clean up (operation | 317 | * result status). Also signal that we can now |
3898 | * complete) or forward to other peers (more results possible). | 318 | * receive more request information from the client. |
3899 | * | 319 | * |
3900 | * @param cls our closure (struct LocalGetContext) | 320 | * @param cls the client doing the request ('struct GNUNET_SERVER_Client') |
3901 | * @param key key for the content | 321 | * @param pr the pending request we were processing |
3902 | * @param size number of bytes in data | 322 | * @param result final datastore lookup result |
3903 | * @param data content stored | ||
3904 | * @param type type of the content | ||
3905 | * @param priority priority of the content | ||
3906 | * @param anonymity anonymity-level for the content | ||
3907 | * @param expiration expiration time for the content | ||
3908 | * @param uid unique identifier for the datum; | ||
3909 | * maybe 0 if no unique identifier is available | ||
3910 | */ | 323 | */ |
3911 | static void | 324 | static void |
3912 | process_local_reply (void *cls, | 325 | start_p2p_processing (void *cls, |
3913 | const GNUNET_HashCode * key, | 326 | struct GSF_PendingRequest *pr, |
3914 | size_t size, | 327 | enum GNUNET_BLOCK_EvaluationResult result) |
3915 | const void *data, | ||
3916 | enum GNUNET_BLOCK_Type type, | ||
3917 | uint32_t priority, | ||
3918 | uint32_t anonymity, | ||
3919 | struct GNUNET_TIME_Absolute | ||
3920 | expiration, | ||
3921 | uint64_t uid) | ||
3922 | { | 328 | { |
3923 | struct PendingRequest *pr = cls; | 329 | struct GNUNET_SERVER_Client *client = cls; |
3924 | struct ProcessReplyClosure prq; | 330 | struct GSF_PendingRequestData *prd; |
3925 | struct CheckDuplicateRequestClosure cdrc; | 331 | |
3926 | GNUNET_HashCode query; | 332 | prd = GSF_pending_request_get_data_ (pr); |
3927 | unsigned int old_rf; | ||
3928 | |||
3929 | if (NULL == key) | ||
3930 | { | ||
3931 | #if DEBUG_FS > 1 | ||
3932 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3933 | "Done processing local replies, forwarding request to other peers.\n"); | ||
3934 | #endif | ||
3935 | pr->qe = NULL; | ||
3936 | if (pr->client_request_list != NULL) | ||
3937 | { | ||
3938 | GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, | ||
3939 | GNUNET_YES); | ||
3940 | /* Figure out if this is a duplicate request and possibly | ||
3941 | merge 'struct PendingRequest' entries */ | ||
3942 | cdrc.have = NULL; | ||
3943 | cdrc.pr = pr; | ||
3944 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, | ||
3945 | &pr->query, | ||
3946 | &check_duplicate_request_client, | ||
3947 | &cdrc); | ||
3948 | if (cdrc.have != NULL) | ||
3949 | { | ||
3950 | #if DEBUG_FS | ||
3951 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3952 | "Received request for block `%s' twice from client, will only request once.\n", | ||
3953 | GNUNET_h2s (&pr->query)); | ||
3954 | #endif | ||
3955 | |||
3956 | destroy_pending_request (pr); | ||
3957 | return; | ||
3958 | } | ||
3959 | } | ||
3960 | if (pr->local_only == GNUNET_YES) | ||
3961 | { | ||
3962 | destroy_pending_request (pr); | ||
3963 | return; | ||
3964 | } | ||
3965 | /* no more results */ | ||
3966 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
3967 | pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task, | ||
3968 | pr); | ||
3969 | return; | ||
3970 | } | ||
3971 | #if DEBUG_FS | 333 | #if DEBUG_FS |
3972 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 334 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3973 | "New local response to `%s' of type %u.\n", | 335 | "Finished database lookup for local request `%s' with result %d\n", |
3974 | GNUNET_h2s (key), | 336 | GNUNET_h2s (&prd->query), |
3975 | type); | 337 | result); |
3976 | #endif | ||
3977 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | ||
3978 | { | ||
3979 | #if DEBUG_FS | ||
3980 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
3981 | "Found ONDEMAND block, performing on-demand encoding\n"); | ||
3982 | #endif | 338 | #endif |
3983 | GNUNET_STATISTICS_update (stats, | 339 | GNUNET_SERVER_receive_done (client, |
3984 | gettext_noop ("# on-demand blocks matched requests"), | 340 | GNUNET_OK); |
3985 | 1, | 341 | if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) |
3986 | GNUNET_NO); | 342 | return; /* we're done, 'pr' was already destroyed... */ |
3987 | if (GNUNET_OK != | 343 | if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) ) |
3988 | GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, | ||
3989 | anonymity, expiration, uid, | ||
3990 | &process_local_reply, | ||
3991 | pr)) | ||
3992 | if (pr->qe != NULL) | ||
3993 | { | ||
3994 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
3995 | } | ||
3996 | return; | ||
3997 | } | ||
3998 | old_rf = pr->results_found; | ||
3999 | memset (&prq, 0, sizeof (prq)); | ||
4000 | prq.data = data; | ||
4001 | prq.expiration = expiration; | ||
4002 | prq.size = size; | ||
4003 | if (GNUNET_OK != | ||
4004 | GNUNET_BLOCK_get_key (block_ctx, | ||
4005 | type, | ||
4006 | data, | ||
4007 | size, | ||
4008 | &query)) | ||
4009 | { | 344 | { |
4010 | GNUNET_break (0); | 345 | GSF_pending_request_cancel_ (pr); |
4011 | GNUNET_DATASTORE_remove (dsh, | ||
4012 | key, | ||
4013 | size, data, | ||
4014 | -1, -1, | ||
4015 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
4016 | NULL, NULL); | ||
4017 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
4018 | return; | 346 | return; |
4019 | } | 347 | } |
4020 | prq.type = type; | 348 | GSF_dht_lookup_ (pr); |
4021 | prq.priority = priority; | 349 | consider_forwarding (NULL, pr, result); |
4022 | prq.finished = GNUNET_NO; | ||
4023 | prq.request_found = GNUNET_NO; | ||
4024 | prq.anonymity_level = anonymity; | ||
4025 | if ( (old_rf == 0) && | ||
4026 | (pr->results_found == 0) ) | ||
4027 | update_datastore_delays (pr->start_time); | ||
4028 | process_reply (&prq, key, pr); | ||
4029 | if (prq.finished == GNUNET_YES) | ||
4030 | return; | ||
4031 | if (pr->qe == NULL) | ||
4032 | return; /* done here */ | ||
4033 | if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) | ||
4034 | { | ||
4035 | pr->local_only = GNUNET_YES; /* do not forward */ | ||
4036 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
4037 | return; | ||
4038 | } | ||
4039 | if ( (pr->client_request_list == NULL) && | ||
4040 | ( (GNUNET_YES == test_get_load_too_high (0)) || | ||
4041 | (pr->results_found > 5 + 2 * pr->priority) ) ) | ||
4042 | { | ||
4043 | #if DEBUG_FS > 2 | ||
4044 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4045 | "Load too high, done with request\n"); | ||
4046 | #endif | ||
4047 | GNUNET_STATISTICS_update (stats, | ||
4048 | gettext_noop ("# processing result set cut short due to load"), | ||
4049 | 1, | ||
4050 | GNUNET_NO); | ||
4051 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
4052 | return; | ||
4053 | } | ||
4054 | GNUNET_DATASTORE_iterate_get_next (dsh); | ||
4055 | } | 350 | } |
4056 | 351 | ||
4057 | 352 | ||
4058 | /** | 353 | /** |
4059 | * We've received a request with the specified priority. Bound it | 354 | * Handle START_SEARCH-message (search request from client). |
4060 | * according to how much we trust the given peer. | 355 | * |
4061 | * | 356 | * @param cls closure |
4062 | * @param prio_in requested priority | 357 | * @param client identification of the client |
4063 | * @param cp the peer making the request | 358 | * @param message the actual message |
4064 | * @return effective priority | ||
4065 | */ | 359 | */ |
4066 | static int32_t | 360 | static void |
4067 | bound_priority (uint32_t prio_in, | 361 | handle_start_search (void *cls, |
4068 | struct ConnectedPeer *cp) | 362 | struct GNUNET_SERVER_Client *client, |
363 | const struct GNUNET_MessageHeader *message) | ||
4069 | { | 364 | { |
4070 | #define N ((double)128.0) | 365 | struct GSF_PendingRequest *pr; |
4071 | uint32_t ret; | ||
4072 | double rret; | ||
4073 | int ld; | ||
4074 | 366 | ||
4075 | ld = test_get_load_too_high (0); | 367 | pr = GSF_local_client_start_search_handler_ (client, message); |
4076 | if (ld == GNUNET_SYSERR) | 368 | if (NULL == pr) |
4077 | { | ||
4078 | GNUNET_STATISTICS_update (stats, | ||
4079 | gettext_noop ("# requests done for free (low load)"), | ||
4080 | 1, | ||
4081 | GNUNET_NO); | ||
4082 | return 0; /* excess resources */ | ||
4083 | } | ||
4084 | if (prio_in > INT32_MAX) | ||
4085 | prio_in = INT32_MAX; | ||
4086 | ret = - change_host_trust (cp, - (int) prio_in); | ||
4087 | if (ret > 0) | ||
4088 | { | ||
4089 | if (ret > current_priorities + N) | ||
4090 | rret = current_priorities + N; | ||
4091 | else | ||
4092 | rret = ret; | ||
4093 | current_priorities | ||
4094 | = (current_priorities * (N-1) + rret)/N; | ||
4095 | } | ||
4096 | if ( (ld == GNUNET_YES) && (ret > 0) ) | ||
4097 | { | 369 | { |
4098 | /* try with charging */ | 370 | /* 'GNUNET_SERVER_receive_done was already called! */ |
4099 | ld = test_get_load_too_high (ret); | 371 | return; |
4100 | } | ||
4101 | if (ld == GNUNET_YES) | ||
4102 | { | ||
4103 | GNUNET_STATISTICS_update (stats, | ||
4104 | gettext_noop ("# request dropped, priority insufficient"), | ||
4105 | 1, | ||
4106 | GNUNET_NO); | ||
4107 | /* undo charge */ | ||
4108 | change_host_trust (cp, (int) ret); | ||
4109 | return -1; /* not enough resources */ | ||
4110 | } | ||
4111 | else | ||
4112 | { | ||
4113 | GNUNET_STATISTICS_update (stats, | ||
4114 | gettext_noop ("# requests done for a price (normal load)"), | ||
4115 | 1, | ||
4116 | GNUNET_NO); | ||
4117 | } | 372 | } |
4118 | #undef N | 373 | GSF_local_lookup_ (pr, |
4119 | return ret; | 374 | &start_p2p_processing, |
375 | client); | ||
4120 | } | 376 | } |
4121 | 377 | ||
4122 | 378 | ||
4123 | /** | 379 | /** |
4124 | * Iterator over entries in the 'query_request_map' that | 380 | * Task run during shutdown. |
4125 | * tries to see if we have the same request pending from | ||
4126 | * the same peer already. | ||
4127 | * | 381 | * |
4128 | * @param cls closure (our 'struct CheckDuplicateRequestClosure') | 382 | * @param cls unused |
4129 | * @param key current key code (query, ignored, must match) | 383 | * @param tc unused |
4130 | * @param value value in the hash map (a 'struct PendingRequest' | ||
4131 | * that already exists) | ||
4132 | * @return GNUNET_YES if we should continue to | ||
4133 | * iterate (no match yet) | ||
4134 | * GNUNET_NO if not (match found). | ||
4135 | */ | 384 | */ |
4136 | static int | 385 | static void |
4137 | check_duplicate_request_peer (void *cls, | 386 | shutdown_task (void *cls, |
4138 | const GNUNET_HashCode * key, | 387 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
4139 | void *value) | ||
4140 | { | 388 | { |
4141 | struct CheckDuplicateRequestClosure *cdc = cls; | 389 | if (NULL != GSF_core) |
4142 | struct PendingRequest *have = value; | 390 | { |
4143 | 391 | GNUNET_CORE_disconnect (GSF_core); | |
4144 | if (cdc->pr->target_pid == have->target_pid) | 392 | GSF_core = NULL; |
393 | } | ||
394 | GSF_put_done_ (); | ||
395 | GSF_push_done_ (); | ||
396 | GSF_pending_request_done_ (); | ||
397 | GSF_plan_done (); | ||
398 | GSF_connected_peer_done_ (); | ||
399 | GNUNET_DATASTORE_disconnect (GSF_dsh, GNUNET_NO); | ||
400 | GSF_dsh = NULL; | ||
401 | GNUNET_DHT_disconnect (GSF_dht); | ||
402 | GSF_dht = NULL; | ||
403 | GNUNET_BLOCK_context_destroy (GSF_block_ctx); | ||
404 | GSF_block_ctx = NULL; | ||
405 | GNUNET_CONFIGURATION_destroy (block_cfg); | ||
406 | block_cfg = NULL; | ||
407 | GNUNET_STATISTICS_destroy (GSF_stats, GNUNET_NO); | ||
408 | if (GNUNET_SCHEDULER_NO_TASK != cover_age_task) | ||
4145 | { | 409 | { |
4146 | cdc->have = have; | 410 | GNUNET_SCHEDULER_cancel (cover_age_task); |
4147 | return GNUNET_NO; | 411 | cover_age_task = GNUNET_SCHEDULER_NO_TASK; |
4148 | } | 412 | } |
4149 | return GNUNET_YES; | 413 | GNUNET_FS_indexing_done (); |
414 | GNUNET_LOAD_value_free (datastore_get_load); | ||
415 | datastore_get_load = NULL; | ||
416 | GNUNET_LOAD_value_free (GSF_rt_entry_lifetime); | ||
417 | GSF_rt_entry_lifetime = NULL; | ||
4150 | } | 418 | } |
4151 | 419 | ||
4152 | 420 | ||
4153 | /** | 421 | /** |
4154 | * Handle P2P "GET" request. | 422 | * Function called for each pending request whenever a new |
423 | * peer connects, giving us a chance to decide about submitting | ||
424 | * the existing request to the new peer. | ||
4155 | * | 425 | * |
4156 | * @param cls closure, always NULL | 426 | * @param cls the 'struct GSF_ConnectedPeer' of the new peer |
4157 | * @param other the other peer involved (sender or receiver, NULL | 427 | * @param key query for the request |
4158 | * for loopback messages where we are both sender and receiver) | 428 | * @param pr handle to the pending request |
4159 | * @param message the actual message | 429 | * @return GNUNET_YES to continue to iterate |
4160 | * @param atsi performance information | ||
4161 | * @return GNUNET_OK to keep the connection open, | ||
4162 | * GNUNET_SYSERR to close it (signal serious error) | ||
4163 | */ | 430 | */ |
4164 | static int | 431 | static int |
4165 | handle_p2p_get (void *cls, | 432 | consider_peer_for_forwarding (void *cls, |
4166 | const struct GNUNET_PeerIdentity *other, | 433 | const GNUNET_HashCode *key, |
4167 | const struct GNUNET_MessageHeader *message, | 434 | struct GSF_PendingRequest *pr) |
4168 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
4169 | { | 435 | { |
4170 | struct PendingRequest *pr; | 436 | struct GSF_ConnectedPeer *cp = cls; |
4171 | struct ConnectedPeer *cp; | ||
4172 | struct ConnectedPeer *cps; | ||
4173 | struct CheckDuplicateRequestClosure cdc; | ||
4174 | struct GNUNET_TIME_Relative timeout; | ||
4175 | uint16_t msize; | ||
4176 | const struct GetMessage *gm; | ||
4177 | unsigned int bits; | ||
4178 | const GNUNET_HashCode *opt; | ||
4179 | uint32_t bm; | ||
4180 | size_t bfsize; | ||
4181 | uint32_t ttl_decrement; | ||
4182 | int32_t priority; | ||
4183 | enum GNUNET_BLOCK_Type type; | ||
4184 | int have_ns; | ||
4185 | |||
4186 | msize = ntohs(message->size); | ||
4187 | if (msize < sizeof (struct GetMessage)) | ||
4188 | { | ||
4189 | GNUNET_break_op (0); | ||
4190 | return GNUNET_SYSERR; | ||
4191 | } | ||
4192 | gm = (const struct GetMessage*) message; | ||
4193 | #if DEBUG_FS | ||
4194 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4195 | "Received request for `%s'\n", | ||
4196 | GNUNET_h2s (&gm->query)); | ||
4197 | #endif | ||
4198 | type = ntohl (gm->type); | ||
4199 | bm = ntohl (gm->hash_bitmap); | ||
4200 | bits = 0; | ||
4201 | while (bm > 0) | ||
4202 | { | ||
4203 | if (1 == (bm & 1)) | ||
4204 | bits++; | ||
4205 | bm >>= 1; | ||
4206 | } | ||
4207 | if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode)) | ||
4208 | { | ||
4209 | GNUNET_break_op (0); | ||
4210 | return GNUNET_SYSERR; | ||
4211 | } | ||
4212 | opt = (const GNUNET_HashCode*) &gm[1]; | ||
4213 | bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode); | ||
4214 | /* bfsize must be power of 2, check! */ | ||
4215 | if (0 != ( (bfsize - 1) & bfsize)) | ||
4216 | { | ||
4217 | GNUNET_break_op (0); | ||
4218 | return GNUNET_SYSERR; | ||
4219 | } | ||
4220 | cover_query_count++; | ||
4221 | bm = ntohl (gm->hash_bitmap); | ||
4222 | bits = 0; | ||
4223 | cps = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
4224 | &other->hashPubKey); | ||
4225 | if (NULL == cps) | ||
4226 | { | ||
4227 | /* peer must have just disconnected */ | ||
4228 | GNUNET_STATISTICS_update (stats, | ||
4229 | gettext_noop ("# requests dropped due to initiator not being connected"), | ||
4230 | 1, | ||
4231 | GNUNET_NO); | ||
4232 | return GNUNET_SYSERR; | ||
4233 | } | ||
4234 | if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) | ||
4235 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
4236 | &opt[bits++]); | ||
4237 | else | ||
4238 | cp = cps; | ||
4239 | if (cp == NULL) | ||
4240 | { | ||
4241 | #if DEBUG_FS | ||
4242 | if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) | ||
4243 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4244 | "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n", | ||
4245 | GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1])); | ||
4246 | |||
4247 | else | ||
4248 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4249 | "Failed to find peer `%4s' in connection set. Dropping query.\n", | ||
4250 | GNUNET_i2s (other)); | ||
4251 | #endif | ||
4252 | GNUNET_STATISTICS_update (stats, | ||
4253 | gettext_noop ("# requests dropped due to missing reverse route"), | ||
4254 | 1, | ||
4255 | GNUNET_NO); | ||
4256 | /* FIXME: try connect? */ | ||
4257 | return GNUNET_OK; | ||
4258 | } | ||
4259 | /* note that we can really only check load here since otherwise | ||
4260 | peers could find out that we are overloaded by not being | ||
4261 | disconnected after sending us a malformed query... */ | ||
4262 | priority = bound_priority (ntohl (gm->priority), cps); | ||
4263 | if (priority < 0) | ||
4264 | { | ||
4265 | #if DEBUG_FS | ||
4266 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4267 | "Dropping query from `%s', this peer is too busy.\n", | ||
4268 | GNUNET_i2s (other)); | ||
4269 | #endif | ||
4270 | return GNUNET_OK; | ||
4271 | } | ||
4272 | #if DEBUG_FS | ||
4273 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4274 | "Received request for `%s' of type %u from peer `%4s' with flags %u\n", | ||
4275 | GNUNET_h2s (&gm->query), | ||
4276 | (unsigned int) type, | ||
4277 | GNUNET_i2s (other), | ||
4278 | (unsigned int) bm); | ||
4279 | #endif | ||
4280 | have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)); | ||
4281 | pr = GNUNET_malloc (sizeof (struct PendingRequest) + | ||
4282 | (have_ns ? sizeof(GNUNET_HashCode) : 0)); | ||
4283 | if (have_ns) | ||
4284 | { | ||
4285 | pr->namespace = (GNUNET_HashCode*) &pr[1]; | ||
4286 | memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); | ||
4287 | } | ||
4288 | if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) || | ||
4289 | (GNUNET_LOAD_get_average (cp->transmission_delay) > | ||
4290 | GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) | ||
4291 | { | ||
4292 | /* don't have BW to send to peer, or would likely take longer than we have for it, | ||
4293 | so at best indirect the query */ | ||
4294 | priority = 0; | ||
4295 | pr->forward_only = GNUNET_YES; | ||
4296 | } | ||
4297 | pr->type = type; | ||
4298 | pr->mingle = ntohl (gm->filter_mutator); | ||
4299 | if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) | ||
4300 | pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); | ||
4301 | pr->anonymity_level = 1; | ||
4302 | pr->priority = (uint32_t) priority; | ||
4303 | pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); | ||
4304 | pr->query = gm->query; | ||
4305 | /* decrement ttl (always) */ | ||
4306 | ttl_decrement = 2 * TTL_DECREMENT + | ||
4307 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
4308 | TTL_DECREMENT); | ||
4309 | if ( (pr->ttl < 0) && | ||
4310 | (((int32_t)(pr->ttl - ttl_decrement)) > 0) ) | ||
4311 | { | ||
4312 | #if DEBUG_FS | ||
4313 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4314 | "Dropping query from `%s' due to TTL underflow (%d - %u).\n", | ||
4315 | GNUNET_i2s (other), | ||
4316 | pr->ttl, | ||
4317 | ttl_decrement); | ||
4318 | #endif | ||
4319 | GNUNET_STATISTICS_update (stats, | ||
4320 | gettext_noop ("# requests dropped due TTL underflow"), | ||
4321 | 1, | ||
4322 | GNUNET_NO); | ||
4323 | /* integer underflow => drop (should be very rare)! */ | ||
4324 | GNUNET_free (pr); | ||
4325 | return GNUNET_OK; | ||
4326 | } | ||
4327 | pr->ttl -= ttl_decrement; | ||
4328 | pr->start_time = GNUNET_TIME_absolute_get (); | ||
4329 | |||
4330 | /* get bloom filter */ | ||
4331 | if (bfsize > 0) | ||
4332 | { | ||
4333 | pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], | ||
4334 | bfsize, | ||
4335 | BLOOMFILTER_K); | ||
4336 | pr->bf_size = bfsize; | ||
4337 | } | ||
4338 | cdc.have = NULL; | ||
4339 | cdc.pr = pr; | ||
4340 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, | ||
4341 | &gm->query, | ||
4342 | &check_duplicate_request_peer, | ||
4343 | &cdc); | ||
4344 | if (cdc.have != NULL) | ||
4345 | { | ||
4346 | if (cdc.have->start_time.abs_value + cdc.have->ttl >= | ||
4347 | pr->start_time.abs_value + pr->ttl) | ||
4348 | { | ||
4349 | /* existing request has higher TTL, drop new one! */ | ||
4350 | cdc.have->priority += pr->priority; | ||
4351 | destroy_pending_request (pr); | ||
4352 | #if DEBUG_FS | ||
4353 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4354 | "Have existing request with higher TTL, dropping new request.\n", | ||
4355 | GNUNET_i2s (other)); | ||
4356 | #endif | ||
4357 | GNUNET_STATISTICS_update (stats, | ||
4358 | gettext_noop ("# requests dropped due to higher-TTL request"), | ||
4359 | 1, | ||
4360 | GNUNET_NO); | ||
4361 | return GNUNET_OK; | ||
4362 | } | ||
4363 | else | ||
4364 | { | ||
4365 | /* existing request has lower TTL, drop old one! */ | ||
4366 | pr->priority += cdc.have->priority; | ||
4367 | /* Possible optimization: if we have applicable pending | ||
4368 | replies in 'cdc.have', we might want to move those over | ||
4369 | (this is a really rare special-case, so it is not clear | ||
4370 | that this would be worth it) */ | ||
4371 | destroy_pending_request (cdc.have); | ||
4372 | /* keep processing 'pr'! */ | ||
4373 | } | ||
4374 | } | ||
4375 | |||
4376 | pr->cp = cp; | ||
4377 | GNUNET_break (GNUNET_OK == | ||
4378 | GNUNET_CONTAINER_multihashmap_put (query_request_map, | ||
4379 | &gm->query, | ||
4380 | pr, | ||
4381 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | ||
4382 | GNUNET_break (GNUNET_OK == | ||
4383 | GNUNET_CONTAINER_multihashmap_put (peer_request_map, | ||
4384 | &other->hashPubKey, | ||
4385 | pr, | ||
4386 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | ||
4387 | 437 | ||
4388 | pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, | 438 | GSF_plan_add_ (cp, pr); |
4389 | pr, | 439 | return GNUNET_YES; |
4390 | pr->start_time.abs_value + pr->ttl); | ||
4391 | |||
4392 | GNUNET_STATISTICS_update (stats, | ||
4393 | gettext_noop ("# P2P searches received"), | ||
4394 | 1, | ||
4395 | GNUNET_NO); | ||
4396 | GNUNET_STATISTICS_update (stats, | ||
4397 | gettext_noop ("# P2P searches active"), | ||
4398 | 1, | ||
4399 | GNUNET_NO); | ||
4400 | |||
4401 | /* calculate change in traffic preference */ | ||
4402 | cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE; | ||
4403 | /* process locally */ | ||
4404 | if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) | ||
4405 | type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */ | ||
4406 | timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, | ||
4407 | (pr->priority + 1)); | ||
4408 | if (GNUNET_YES != pr->forward_only) | ||
4409 | { | ||
4410 | #if DEBUG_FS | ||
4411 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4412 | "Handing request for `%s' to datastore\n", | ||
4413 | GNUNET_h2s (&gm->query)); | ||
4414 | #endif | ||
4415 | pr->qe = GNUNET_DATASTORE_iterate_key (dsh, | ||
4416 | &gm->query, | ||
4417 | type, | ||
4418 | pr->priority + 1, | ||
4419 | MAX_DATASTORE_QUEUE, | ||
4420 | timeout, | ||
4421 | &process_local_reply, | ||
4422 | pr); | ||
4423 | if (NULL == pr->qe) | ||
4424 | { | ||
4425 | GNUNET_STATISTICS_update (stats, | ||
4426 | gettext_noop ("# requests dropped by datastore (queue length limit)"), | ||
4427 | 1, | ||
4428 | GNUNET_NO); | ||
4429 | } | ||
4430 | } | ||
4431 | else | ||
4432 | { | ||
4433 | GNUNET_STATISTICS_update (stats, | ||
4434 | gettext_noop ("# requests forwarded due to high load"), | ||
4435 | 1, | ||
4436 | GNUNET_NO); | ||
4437 | } | ||
4438 | |||
4439 | /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */ | ||
4440 | switch (pr->type) | ||
4441 | { | ||
4442 | case GNUNET_BLOCK_TYPE_FS_DBLOCK: | ||
4443 | case GNUNET_BLOCK_TYPE_FS_IBLOCK: | ||
4444 | /* only one result, wait for datastore */ | ||
4445 | if (GNUNET_YES != pr->forward_only) | ||
4446 | { | ||
4447 | GNUNET_STATISTICS_update (stats, | ||
4448 | gettext_noop ("# requests not instantly forwarded (waiting for datastore)"), | ||
4449 | 1, | ||
4450 | GNUNET_NO); | ||
4451 | break; | ||
4452 | } | ||
4453 | default: | ||
4454 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
4455 | pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task, | ||
4456 | pr); | ||
4457 | } | ||
4458 | |||
4459 | /* make sure we don't track too many requests */ | ||
4460 | if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) | ||
4461 | { | ||
4462 | pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); | ||
4463 | GNUNET_assert (pr != NULL); | ||
4464 | destroy_pending_request (pr); | ||
4465 | } | ||
4466 | return GNUNET_OK; | ||
4467 | } | 440 | } |
4468 | 441 | ||
4469 | 442 | ||
4470 | /* **************************** CS GET Handling ************************ */ | ||
4471 | |||
4472 | |||
4473 | /** | 443 | /** |
4474 | * Handle START_SEARCH-message (search request from client). | 444 | * Method called whenever a given peer connects. |
4475 | * | 445 | * |
4476 | * @param cls closure | 446 | * @param cls closure, not used |
4477 | * @param client identification of the client | 447 | * @param peer peer identity this notification is about |
4478 | * @param message the actual message | 448 | * @param atsi performance information |
4479 | */ | 449 | */ |
4480 | static void | 450 | static void |
4481 | handle_start_search (void *cls, | 451 | peer_connect_handler (void *cls, |
4482 | struct GNUNET_SERVER_Client *client, | 452 | const struct GNUNET_PeerIdentity *peer, |
4483 | const struct GNUNET_MessageHeader *message) | 453 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) |
4484 | { | 454 | { |
4485 | static GNUNET_HashCode all_zeros; | 455 | struct GSF_ConnectedPeer *cp; |
4486 | const struct SearchMessage *sm; | ||
4487 | struct ClientList *cl; | ||
4488 | struct ClientRequestList *crl; | ||
4489 | struct PendingRequest *pr; | ||
4490 | uint16_t msize; | ||
4491 | unsigned int sc; | ||
4492 | enum GNUNET_BLOCK_Type type; | ||
4493 | 456 | ||
4494 | msize = ntohs (message->size); | 457 | if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity))) |
4495 | if ( (msize < sizeof (struct SearchMessage)) || | 458 | return; |
4496 | (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) ) | 459 | cp = GSF_peer_connect_handler_ (peer, atsi); |
4497 | { | 460 | if (NULL == cp) |
4498 | GNUNET_break (0); | 461 | return; |
4499 | GNUNET_SERVER_receive_done (client, | 462 | GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, |
4500 | GNUNET_SYSERR); | 463 | cp); |
4501 | return; | ||
4502 | } | ||
4503 | GNUNET_STATISTICS_update (stats, | ||
4504 | gettext_noop ("# client searches received"), | ||
4505 | 1, | ||
4506 | GNUNET_NO); | ||
4507 | sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode); | ||
4508 | sm = (const struct SearchMessage*) message; | ||
4509 | type = ntohl (sm->type); | ||
4510 | #if DEBUG_FS | ||
4511 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4512 | "Received request for `%s' of type %u from local client\n", | ||
4513 | GNUNET_h2s (&sm->query), | ||
4514 | (unsigned int) type); | ||
4515 | #endif | ||
4516 | cl = client_list; | ||
4517 | while ( (cl != NULL) && | ||
4518 | (cl->client != client) ) | ||
4519 | cl = cl->next; | ||
4520 | if (cl == NULL) | ||
4521 | { | ||
4522 | cl = GNUNET_malloc (sizeof (struct ClientList)); | ||
4523 | cl->client = client; | ||
4524 | GNUNET_SERVER_client_keep (client); | ||
4525 | cl->next = client_list; | ||
4526 | client_list = cl; | ||
4527 | } | ||
4528 | /* detect duplicate KBLOCK requests */ | ||
4529 | if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) || | ||
4530 | (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) || | ||
4531 | (type == GNUNET_BLOCK_TYPE_ANY) ) | ||
4532 | { | ||
4533 | crl = cl->rl_head; | ||
4534 | while ( (crl != NULL) && | ||
4535 | ( (0 != memcmp (&crl->req->query, | ||
4536 | &sm->query, | ||
4537 | sizeof (GNUNET_HashCode))) || | ||
4538 | (crl->req->type != type) ) ) | ||
4539 | crl = crl->next; | ||
4540 | if (crl != NULL) | ||
4541 | { | ||
4542 | #if DEBUG_FS | ||
4543 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
4544 | "Have existing request, merging content-seen lists.\n"); | ||
4545 | #endif | ||
4546 | pr = crl->req; | ||
4547 | /* Duplicate request (used to send long list of | ||
4548 | known/blocked results); merge 'pr->replies_seen' | ||
4549 | and update bloom filter */ | ||
4550 | GNUNET_array_grow (pr->replies_seen, | ||
4551 | pr->replies_seen_size, | ||
4552 | pr->replies_seen_off + sc); | ||
4553 | memcpy (&pr->replies_seen[pr->replies_seen_off], | ||
4554 | &sm[1], | ||
4555 | sc * sizeof (GNUNET_HashCode)); | ||
4556 | pr->replies_seen_off += sc; | ||
4557 | refresh_bloomfilter (pr); | ||
4558 | GNUNET_STATISTICS_update (stats, | ||
4559 | gettext_noop ("# client searches updated (merged content seen list)"), | ||
4560 | 1, | ||
4561 | GNUNET_NO); | ||
4562 | GNUNET_SERVER_receive_done (client, | ||
4563 | GNUNET_OK); | ||
4564 | return; | ||
4565 | } | ||
4566 | } | ||
4567 | GNUNET_STATISTICS_update (stats, | ||
4568 | gettext_noop ("# client searches active"), | ||
4569 | 1, | ||
4570 | GNUNET_NO); | ||
4571 | pr = GNUNET_malloc (sizeof (struct PendingRequest) + | ||
4572 | ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0)); | ||
4573 | crl = GNUNET_malloc (sizeof (struct ClientRequestList)); | ||
4574 | memset (crl, 0, sizeof (struct ClientRequestList)); | ||
4575 | crl->client_list = cl; | ||
4576 | GNUNET_CONTAINER_DLL_insert (cl->rl_head, | ||
4577 | cl->rl_tail, | ||
4578 | crl); | ||
4579 | crl->req = pr; | ||
4580 | pr->type = type; | ||
4581 | pr->client_request_list = crl; | ||
4582 | GNUNET_array_grow (pr->replies_seen, | ||
4583 | pr->replies_seen_size, | ||
4584 | sc); | ||
4585 | memcpy (pr->replies_seen, | ||
4586 | &sm[1], | ||
4587 | sc * sizeof (GNUNET_HashCode)); | ||
4588 | pr->replies_seen_off = sc; | ||
4589 | pr->anonymity_level = ntohl (sm->anonymity_level); | ||
4590 | pr->start_time = GNUNET_TIME_absolute_get (); | ||
4591 | refresh_bloomfilter (pr); | ||
4592 | pr->query = sm->query; | ||
4593 | if (0 == (1 & ntohl (sm->options))) | ||
4594 | pr->local_only = GNUNET_NO; | ||
4595 | else | ||
4596 | pr->local_only = GNUNET_YES; | ||
4597 | switch (type) | ||
4598 | { | ||
4599 | case GNUNET_BLOCK_TYPE_FS_DBLOCK: | ||
4600 | case GNUNET_BLOCK_TYPE_FS_IBLOCK: | ||
4601 | if (0 != memcmp (&sm->target, | ||
4602 | &all_zeros, | ||
4603 | sizeof (GNUNET_HashCode))) | ||
4604 | pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target); | ||
4605 | break; | ||
4606 | case GNUNET_BLOCK_TYPE_FS_SBLOCK: | ||
4607 | pr->namespace = (GNUNET_HashCode*) &pr[1]; | ||
4608 | memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode)); | ||
4609 | break; | ||
4610 | default: | ||
4611 | break; | ||
4612 | } | ||
4613 | GNUNET_break (GNUNET_OK == | ||
4614 | GNUNET_CONTAINER_multihashmap_put (query_request_map, | ||
4615 | &sm->query, | ||
4616 | pr, | ||
4617 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | ||
4618 | if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) | ||
4619 | type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */ | ||
4620 | pr->qe = GNUNET_DATASTORE_iterate_key (dsh, | ||
4621 | &sm->query, | ||
4622 | type, | ||
4623 | -3, -1, | ||
4624 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
4625 | &process_local_reply, | ||
4626 | pr); | ||
4627 | } | 464 | } |
4628 | 465 | ||
4629 | 466 | ||
4630 | /* **************************** Startup ************************ */ | ||
4631 | |||
4632 | |||
4633 | |||
4634 | /** | 467 | /** |
4635 | * Function called after GNUNET_CORE_connect has succeeded | 468 | * Function called after GNUNET_CORE_connect has succeeded |
4636 | * (or failed for good). Note that the private key of the | 469 | * (or failed for good). Note that the private key of the |
@@ -4656,8 +489,6 @@ peer_init_handler (void *cls, | |||
4656 | } | 489 | } |
4657 | 490 | ||
4658 | 491 | ||
4659 | |||
4660 | |||
4661 | /** | 492 | /** |
4662 | * Process fs requests. | 493 | * Process fs requests. |
4663 | * | 494 | * |
@@ -4674,7 +505,7 @@ main_init (struct GNUNET_SERVER_Handle *server, | |||
4674 | GNUNET_MESSAGE_TYPE_FS_GET, 0 }, | 505 | GNUNET_MESSAGE_TYPE_FS_GET, 0 }, |
4675 | { &handle_p2p_put, | 506 | { &handle_p2p_put, |
4676 | GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, | 507 | GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, |
4677 | { &handle_p2p_migration_stop, | 508 | { &GSF_handle_p2p_migration_stop_, |
4678 | GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, | 509 | GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, |
4679 | sizeof (struct MigrationStopMessage) }, | 510 | sizeof (struct MigrationStopMessage) }, |
4680 | { NULL, 0, 0 } | 511 | { NULL, 0, 0 } |
@@ -4690,91 +521,32 @@ main_init (struct GNUNET_SERVER_Handle *server, | |||
4690 | 0 }, | 521 | 0 }, |
4691 | {NULL, NULL, 0, 0} | 522 | {NULL, NULL, 0, 0} |
4692 | }; | 523 | }; |
4693 | unsigned long long enc = 128; | ||
4694 | 524 | ||
4695 | cfg = c; | 525 | GSF_core = GNUNET_CORE_connect (GSF_cfg, |
4696 | stats = GNUNET_STATISTICS_create ("fs", cfg); | 526 | 2, /* larger? */ |
4697 | min_migration_delay = GNUNET_TIME_UNIT_SECONDS; | 527 | NULL, |
4698 | if ( (GNUNET_OK != | 528 | &peer_init_handler, |
4699 | GNUNET_CONFIGURATION_get_value_number (cfg, | 529 | &peer_connect_handler, |
4700 | "fs", | 530 | &GSF_peer_disconnect_handler_, |
4701 | "MAX_PENDING_REQUESTS", | 531 | &GSF_peer_status_handler_, |
4702 | &max_pending_requests)) || | 532 | NULL, GNUNET_NO, |
4703 | (GNUNET_OK != | 533 | NULL, GNUNET_NO, |
4704 | GNUNET_CONFIGURATION_get_value_number (cfg, | 534 | p2p_handlers); |
4705 | "fs", | 535 | if (NULL == GSF_core) |
4706 | "EXPECTED_NEIGHBOUR_COUNT", | ||
4707 | &enc)) || | ||
4708 | (GNUNET_OK != | ||
4709 | GNUNET_CONFIGURATION_get_value_time (cfg, | ||
4710 | "fs", | ||
4711 | "MIN_MIGRATION_DELAY", | ||
4712 | &min_migration_delay)) ) | ||
4713 | { | ||
4714 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
4715 | _("Configuration fails to specify certain parameters, assuming default values.")); | ||
4716 | } | ||
4717 | connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); | ||
4718 | query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests); | ||
4719 | rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL); | ||
4720 | peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc); | ||
4721 | requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
4722 | core = GNUNET_CORE_connect (cfg, | ||
4723 | 1, /* larger? */ | ||
4724 | NULL, | ||
4725 | &peer_init_handler, | ||
4726 | &peer_connect_handler, | ||
4727 | &peer_disconnect_handler, | ||
4728 | &peer_status_handler, | ||
4729 | NULL, GNUNET_NO, | ||
4730 | NULL, GNUNET_NO, | ||
4731 | p2p_handlers); | ||
4732 | if (NULL == core) | ||
4733 | { | 536 | { |
4734 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 537 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
4735 | _("Failed to connect to `%s' service.\n"), | 538 | _("Failed to connect to `%s' service.\n"), |
4736 | "core"); | 539 | "core"); |
4737 | GNUNET_CONTAINER_multihashmap_destroy (connected_peers); | ||
4738 | connected_peers = NULL; | ||
4739 | GNUNET_CONTAINER_multihashmap_destroy (query_request_map); | ||
4740 | query_request_map = NULL; | ||
4741 | GNUNET_LOAD_value_free (rt_entry_lifetime); | ||
4742 | rt_entry_lifetime = NULL; | ||
4743 | GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); | ||
4744 | requests_by_expiration_heap = NULL; | ||
4745 | GNUNET_CONTAINER_multihashmap_destroy (peer_request_map); | ||
4746 | peer_request_map = NULL; | ||
4747 | if (dsh != NULL) | ||
4748 | { | ||
4749 | GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO); | ||
4750 | dsh = NULL; | ||
4751 | } | ||
4752 | return GNUNET_SYSERR; | 540 | return GNUNET_SYSERR; |
4753 | } | 541 | } |
4754 | if (active_from_migration) | ||
4755 | { | ||
4756 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
4757 | _("Content migration is enabled, will start to gather data\n")); | ||
4758 | consider_migration_gathering (); | ||
4759 | } | ||
4760 | consider_dht_put_gathering (NULL); | ||
4761 | GNUNET_SERVER_disconnect_notify (server, | 542 | GNUNET_SERVER_disconnect_notify (server, |
4762 | &handle_client_disconnect, | 543 | &GSF_client_disconnect_handler_, |
4763 | NULL); | 544 | NULL); |
4764 | GNUNET_assert (GNUNET_OK == | ||
4765 | GNUNET_CONFIGURATION_get_value_filename (cfg, | ||
4766 | "fs", | ||
4767 | "TRUST", | ||
4768 | &trustDirectory)); | ||
4769 | GNUNET_DISK_directory_create (trustDirectory); | ||
4770 | GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH, | ||
4771 | &cron_flush_trust, NULL); | ||
4772 | |||
4773 | |||
4774 | GNUNET_SERVER_add_handlers (server, handlers); | 545 | GNUNET_SERVER_add_handlers (server, handlers); |
4775 | cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, | 546 | cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, |
4776 | &age_cover_counters, | 547 | &age_cover_counters, |
4777 | NULL); | 548 | NULL); |
549 | datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); | ||
4778 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, | 550 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, |
4779 | &shutdown_task, | 551 | &shutdown_task, |
4780 | NULL); | 552 | NULL); |
@@ -4794,45 +566,35 @@ run (void *cls, | |||
4794 | struct GNUNET_SERVER_Handle *server, | 566 | struct GNUNET_SERVER_Handle *server, |
4795 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 567 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
4796 | { | 568 | { |
4797 | active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg, | 569 | GSF_cfg = cfg; |
4798 | "FS", | 570 | GSF_dsh = GNUNET_DATASTORE_connect (cfg); |
4799 | "CONTENT_CACHING"); | 571 | if (NULL == GSF_dsh) |
4800 | active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg, | ||
4801 | "FS", | ||
4802 | "CONTENT_PUSHING"); | ||
4803 | dsh = GNUNET_DATASTORE_connect (cfg); | ||
4804 | if (dsh == NULL) | ||
4805 | { | 572 | { |
4806 | GNUNET_SCHEDULER_shutdown (); | 573 | GNUNET_SCHEDULER_shutdown (); |
4807 | return; | 574 | return; |
4808 | } | 575 | } |
4809 | datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); | 576 | GSF_rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL); |
4810 | datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); | 577 | GSF_stats = GNUNET_STATISTICS_create ("fs", cfg); |
4811 | block_cfg = GNUNET_CONFIGURATION_create (); | 578 | block_cfg = GNUNET_CONFIGURATION_create (); |
4812 | GNUNET_CONFIGURATION_set_value_string (block_cfg, | 579 | GNUNET_CONFIGURATION_set_value_string (block_cfg, |
4813 | "block", | 580 | "block", |
4814 | "PLUGINS", | 581 | "PLUGINS", |
4815 | "fs"); | 582 | "fs"); |
4816 | block_ctx = GNUNET_BLOCK_context_create (block_cfg); | 583 | GSF_block_ctx = GNUNET_BLOCK_context_create (block_cfg); |
4817 | GNUNET_assert (NULL != block_ctx); | 584 | GNUNET_assert (NULL != GSF_block_ctx); |
4818 | dht_handle = GNUNET_DHT_connect (cfg, | 585 | GSF_dht = GNUNET_DHT_connect (cfg, |
4819 | FS_DHT_HT_SIZE); | 586 | FS_DHT_HT_SIZE); |
4820 | if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) || | 587 | GSF_plan_init (); |
588 | GSF_pending_request_init_ (); | ||
589 | GSF_connected_peer_init_ (); | ||
590 | GSF_push_init_ (); | ||
591 | GSF_put_init_ (); | ||
592 | if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, GSF_dsh)) || | ||
593 | |||
4821 | (GNUNET_OK != main_init (server, cfg)) ) | 594 | (GNUNET_OK != main_init (server, cfg)) ) |
4822 | { | 595 | { |
4823 | GNUNET_SCHEDULER_shutdown (); | 596 | GNUNET_SCHEDULER_shutdown (); |
4824 | GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO); | 597 | shutdown_task (NULL, NULL); |
4825 | dsh = NULL; | ||
4826 | GNUNET_DHT_disconnect (dht_handle); | ||
4827 | dht_handle = NULL; | ||
4828 | GNUNET_BLOCK_context_destroy (block_ctx); | ||
4829 | block_ctx = NULL; | ||
4830 | GNUNET_CONFIGURATION_destroy (block_cfg); | ||
4831 | block_cfg = NULL; | ||
4832 | GNUNET_LOAD_value_free (datastore_get_load); | ||
4833 | datastore_get_load = NULL; | ||
4834 | GNUNET_LOAD_value_free (datastore_put_load); | ||
4835 | datastore_put_load = NULL; | ||
4836 | return; | 598 | return; |
4837 | } | 599 | } |
4838 | } | 600 | } |
diff --git a/src/fs/gnunet-service-fs_new.c b/src/fs/gnunet-service-fs_new.c deleted file mode 100644 index 20a98e6f2..000000000 --- a/src/fs/gnunet-service-fs_new.c +++ /dev/null | |||
@@ -1,621 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file fs/gnunet-service-fs.c | ||
23 | * @brief gnunet anonymity protocol implementation | ||
24 | * @author Christian Grothoff | ||
25 | * | ||
26 | * To use: | ||
27 | * - consider re-issue GSF_dht_lookup_ after non-DHT reply received | ||
28 | * - implement 'SUPPORT_DELAYS' | ||
29 | * | ||
30 | */ | ||
31 | #include "platform.h" | ||
32 | #include <float.h> | ||
33 | #include "gnunet_constants.h" | ||
34 | #include "gnunet_core_service.h" | ||
35 | #include "gnunet_dht_service.h" | ||
36 | #include "gnunet_datastore_service.h" | ||
37 | #include "gnunet_load_lib.h" | ||
38 | #include "gnunet_peer_lib.h" | ||
39 | #include "gnunet_protocols.h" | ||
40 | #include "gnunet_signatures.h" | ||
41 | #include "gnunet_statistics_service.h" | ||
42 | #include "gnunet_transport_service.h" | ||
43 | #include "gnunet_util_lib.h" | ||
44 | #include "gnunet-service-fs_cp.h" | ||
45 | #include "gnunet-service-fs_indexing.h" | ||
46 | #include "gnunet-service-fs_lc.h" | ||
47 | #include "gnunet-service-fs_pe.h" | ||
48 | #include "gnunet-service-fs_pr.h" | ||
49 | #include "gnunet-service-fs_push.h" | ||
50 | #include "gnunet-service-fs_put.h" | ||
51 | #include "fs.h" | ||
52 | |||
53 | /** | ||
54 | * Size for the hash map for DHT requests from the FS | ||
55 | * service. Should be about the number of concurrent | ||
56 | * DHT requests we plan to make. | ||
57 | */ | ||
58 | #define FS_DHT_HT_SIZE 1024 | ||
59 | |||
60 | |||
61 | /** | ||
62 | * How quickly do we age cover traffic? At the given | ||
63 | * time interval, remaining cover traffic counters are | ||
64 | * decremented by 1/16th. | ||
65 | */ | ||
66 | #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) | ||
67 | |||
68 | |||
69 | /* ****************************** globals ****************************** */ | ||
70 | |||
71 | /** | ||
72 | * Our connection to the datastore. | ||
73 | */ | ||
74 | struct GNUNET_DATASTORE_Handle *GSF_dsh; | ||
75 | |||
76 | /** | ||
77 | * Our configuration. | ||
78 | */ | ||
79 | const struct GNUNET_CONFIGURATION_Handle *GSF_cfg; | ||
80 | |||
81 | /** | ||
82 | * Handle for reporting statistics. | ||
83 | */ | ||
84 | struct GNUNET_STATISTICS_Handle *GSF_stats; | ||
85 | |||
86 | /** | ||
87 | * Handle for DHT operations. | ||
88 | */ | ||
89 | struct GNUNET_DHT_Handle *GSF_dht; | ||
90 | |||
91 | /** | ||
92 | * How long do requests typically stay in the routing table? | ||
93 | */ | ||
94 | struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime; | ||
95 | |||
96 | /** | ||
97 | * Typical priorities we're seeing from other peers right now. Since | ||
98 | * most priorities will be zero, this value is the weighted average of | ||
99 | * non-zero priorities seen "recently". In order to ensure that new | ||
100 | * values do not dramatically change the ratio, values are first | ||
101 | * "capped" to a reasonable range (+N of the current value) and then | ||
102 | * averaged into the existing value by a ratio of 1:N. Hence | ||
103 | * receiving the largest possible priority can still only raise our | ||
104 | * "current_priorities" by at most 1. | ||
105 | */ | ||
106 | double GSF_current_priorities; | ||
107 | |||
108 | /** | ||
109 | * How many query messages have we received 'recently' that | ||
110 | * have not yet been claimed as cover traffic? | ||
111 | */ | ||
112 | unsigned int GSF_cover_query_count; | ||
113 | |||
114 | /** | ||
115 | * How many content messages have we received 'recently' that | ||
116 | * have not yet been claimed as cover traffic? | ||
117 | */ | ||
118 | unsigned int GSF_cover_content_count; | ||
119 | |||
120 | /** | ||
121 | * Our block context. | ||
122 | */ | ||
123 | struct GNUNET_BLOCK_Context *GSF_block_ctx; | ||
124 | |||
125 | /** | ||
126 | * Pointer to handle to the core service (points to NULL until we've | ||
127 | * connected to it). | ||
128 | */ | ||
129 | struct GNUNET_CORE_Handle *GSF_core; | ||
130 | |||
131 | |||
132 | /* ***************************** locals ******************************* */ | ||
133 | |||
134 | /** | ||
135 | * Configuration for block library. | ||
136 | */ | ||
137 | static struct GNUNET_CONFIGURATION_Handle *block_cfg; | ||
138 | |||
139 | /** | ||
140 | * ID of our task that we use to age the cover counters. | ||
141 | */ | ||
142 | static GNUNET_SCHEDULER_TaskIdentifier cover_age_task; | ||
143 | |||
144 | /** | ||
145 | * Datastore 'GET' load tracking. | ||
146 | */ | ||
147 | static struct GNUNET_LOAD_Value *datastore_get_load; | ||
148 | |||
149 | /** | ||
150 | * Identity of this peer. | ||
151 | */ | ||
152 | static struct GNUNET_PeerIdentity my_id; | ||
153 | |||
154 | /** | ||
155 | * Task that periodically ages our cover traffic statistics. | ||
156 | * | ||
157 | * @param cls unused closure | ||
158 | * @param tc task context | ||
159 | */ | ||
160 | static void | ||
161 | age_cover_counters (void *cls, | ||
162 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
163 | { | ||
164 | GSF_cover_content_count = (GSF_cover_content_count * 15) / 16; | ||
165 | GSF_cover_query_count = (GSF_cover_query_count * 15) / 16; | ||
166 | cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, | ||
167 | &age_cover_counters, | ||
168 | NULL); | ||
169 | } | ||
170 | |||
171 | |||
172 | |||
173 | /** | ||
174 | * We've just now completed a datastore request. Update our | ||
175 | * datastore load calculations. | ||
176 | * | ||
177 | * @param start time when the datastore request was issued | ||
178 | */ | ||
179 | void | ||
180 | GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start) | ||
181 | { | ||
182 | struct GNUNET_TIME_Relative delay; | ||
183 | |||
184 | delay = GNUNET_TIME_absolute_get_duration (start); | ||
185 | GNUNET_LOAD_update (datastore_get_load, | ||
186 | delay.rel_value); | ||
187 | } | ||
188 | |||
189 | |||
190 | /** | ||
191 | * Test if the DATABASE (GET) load on this peer is too high | ||
192 | * to even consider processing the query at | ||
193 | * all. | ||
194 | * | ||
195 | * @return GNUNET_YES if the load is too high to do anything (load high) | ||
196 | * GNUNET_NO to process normally (load normal) | ||
197 | * GNUNET_SYSERR to process for free (load low) | ||
198 | */ | ||
199 | int | ||
200 | GSF_test_get_load_too_high_ (uint32_t priority) | ||
201 | { | ||
202 | double ld; | ||
203 | |||
204 | ld = GNUNET_LOAD_get_load (datastore_get_load); | ||
205 | if (ld < 1) | ||
206 | return GNUNET_SYSERR; | ||
207 | if (ld <= priority) | ||
208 | return GNUNET_NO; | ||
209 | return GNUNET_YES; | ||
210 | } | ||
211 | |||
212 | |||
213 | /** | ||
214 | * Handle P2P "PUT" message. | ||
215 | * | ||
216 | * @param cls closure, always NULL | ||
217 | * @param other the other peer involved (sender or receiver, NULL | ||
218 | * for loopback messages where we are both sender and receiver) | ||
219 | * @param message the actual message | ||
220 | * @param atsi performance information | ||
221 | * @return GNUNET_OK to keep the connection open, | ||
222 | * GNUNET_SYSERR to close it (signal serious error) | ||
223 | */ | ||
224 | static int | ||
225 | handle_p2p_put (void *cls, | ||
226 | const struct GNUNET_PeerIdentity *other, | ||
227 | const struct GNUNET_MessageHeader *message, | ||
228 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
229 | { | ||
230 | struct GSF_ConnectedPeer *cp; | ||
231 | |||
232 | cp = GSF_peer_get_ (other); | ||
233 | if (NULL == cp) | ||
234 | { | ||
235 | GNUNET_break (0); | ||
236 | return GNUNET_OK; | ||
237 | } | ||
238 | return GSF_handle_p2p_content_ (cp, message); | ||
239 | } | ||
240 | |||
241 | |||
242 | /** | ||
243 | * We have a new request, consider forwarding it to the given | ||
244 | * peer. | ||
245 | * | ||
246 | * @param cls the 'struct GSF_PendingRequest' | ||
247 | * @param peer identity of the peer | ||
248 | * @param cp handle to the connected peer record | ||
249 | * @param ppd peer performance data | ||
250 | */ | ||
251 | static void | ||
252 | consider_request_for_forwarding (void *cls, | ||
253 | const struct GNUNET_PeerIdentity *peer, | ||
254 | struct GSF_ConnectedPeer *cp, | ||
255 | const struct GSF_PeerPerformanceData *ppd) | ||
256 | { | ||
257 | struct GSF_PendingRequest *pr = cls; | ||
258 | |||
259 | GSF_plan_add_ (cp, pr); | ||
260 | } | ||
261 | |||
262 | |||
263 | /** | ||
264 | * Function to be called after we're done processing | ||
265 | * replies from the local lookup. If the result status | ||
266 | * code indicates that there may be more replies, plan | ||
267 | * forwarding the request. | ||
268 | * | ||
269 | * @param cls closure (NULL) | ||
270 | * @param pr the pending request we were processing | ||
271 | * @param result final datastore lookup result | ||
272 | */ | ||
273 | static void | ||
274 | consider_forwarding (void *cls, | ||
275 | struct GSF_PendingRequest *pr, | ||
276 | enum GNUNET_BLOCK_EvaluationResult result) | ||
277 | { | ||
278 | if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) | ||
279 | return; /* we're done... */ | ||
280 | GSF_iterate_connected_peers_ (&consider_request_for_forwarding, | ||
281 | pr); | ||
282 | } | ||
283 | |||
284 | |||
285 | /** | ||
286 | * Handle P2P "GET" request. | ||
287 | * | ||
288 | * @param cls closure, always NULL | ||
289 | * @param other the other peer involved (sender or receiver, NULL | ||
290 | * for loopback messages where we are both sender and receiver) | ||
291 | * @param message the actual message | ||
292 | * @param atsi performance information | ||
293 | * @return GNUNET_OK to keep the connection open, | ||
294 | * GNUNET_SYSERR to close it (signal serious error) | ||
295 | */ | ||
296 | static int | ||
297 | handle_p2p_get (void *cls, | ||
298 | const struct GNUNET_PeerIdentity *other, | ||
299 | const struct GNUNET_MessageHeader *message, | ||
300 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
301 | { | ||
302 | struct GSF_PendingRequest *pr; | ||
303 | |||
304 | pr = GSF_handle_p2p_query_ (other, message); | ||
305 | if (NULL == pr) | ||
306 | return GNUNET_SYSERR; | ||
307 | GSF_local_lookup_ (pr, | ||
308 | &consider_forwarding, | ||
309 | NULL); | ||
310 | return GNUNET_OK; | ||
311 | } | ||
312 | |||
313 | |||
314 | /** | ||
315 | * We're done with the local lookup, now consider | ||
316 | * P2P processing (depending on request options and | ||
317 | * result status). Also signal that we can now | ||
318 | * receive more request information from the client. | ||
319 | * | ||
320 | * @param cls the client doing the request ('struct GNUNET_SERVER_Client') | ||
321 | * @param pr the pending request we were processing | ||
322 | * @param result final datastore lookup result | ||
323 | */ | ||
324 | static void | ||
325 | start_p2p_processing (void *cls, | ||
326 | struct GSF_PendingRequest *pr, | ||
327 | enum GNUNET_BLOCK_EvaluationResult result) | ||
328 | { | ||
329 | struct GNUNET_SERVER_Client *client = cls; | ||
330 | struct GSF_PendingRequestData *prd; | ||
331 | |||
332 | prd = GSF_pending_request_get_data_ (pr); | ||
333 | #if DEBUG_FS | ||
334 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
335 | "Finished database lookup for local request `%s' with result %d\n", | ||
336 | GNUNET_h2s (&prd->query), | ||
337 | result); | ||
338 | #endif | ||
339 | GNUNET_SERVER_receive_done (client, | ||
340 | GNUNET_OK); | ||
341 | if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) | ||
342 | return; /* we're done, 'pr' was already destroyed... */ | ||
343 | if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) ) | ||
344 | { | ||
345 | GSF_pending_request_cancel_ (pr); | ||
346 | return; | ||
347 | } | ||
348 | GSF_dht_lookup_ (pr); | ||
349 | consider_forwarding (NULL, pr, result); | ||
350 | } | ||
351 | |||
352 | |||
353 | /** | ||
354 | * Handle START_SEARCH-message (search request from client). | ||
355 | * | ||
356 | * @param cls closure | ||
357 | * @param client identification of the client | ||
358 | * @param message the actual message | ||
359 | */ | ||
360 | static void | ||
361 | handle_start_search (void *cls, | ||
362 | struct GNUNET_SERVER_Client *client, | ||
363 | const struct GNUNET_MessageHeader *message) | ||
364 | { | ||
365 | struct GSF_PendingRequest *pr; | ||
366 | |||
367 | pr = GSF_local_client_start_search_handler_ (client, message); | ||
368 | if (NULL == pr) | ||
369 | { | ||
370 | /* 'GNUNET_SERVER_receive_done was already called! */ | ||
371 | return; | ||
372 | } | ||
373 | GSF_local_lookup_ (pr, | ||
374 | &start_p2p_processing, | ||
375 | client); | ||
376 | } | ||
377 | |||
378 | |||
379 | /** | ||
380 | * Task run during shutdown. | ||
381 | * | ||
382 | * @param cls unused | ||
383 | * @param tc unused | ||
384 | */ | ||
385 | static void | ||
386 | shutdown_task (void *cls, | ||
387 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
388 | { | ||
389 | if (NULL != GSF_core) | ||
390 | { | ||
391 | GNUNET_CORE_disconnect (GSF_core); | ||
392 | GSF_core = NULL; | ||
393 | } | ||
394 | GSF_put_done_ (); | ||
395 | GSF_push_done_ (); | ||
396 | GSF_pending_request_done_ (); | ||
397 | GSF_plan_done (); | ||
398 | GSF_connected_peer_done_ (); | ||
399 | GNUNET_DATASTORE_disconnect (GSF_dsh, GNUNET_NO); | ||
400 | GSF_dsh = NULL; | ||
401 | GNUNET_DHT_disconnect (GSF_dht); | ||
402 | GSF_dht = NULL; | ||
403 | GNUNET_BLOCK_context_destroy (GSF_block_ctx); | ||
404 | GSF_block_ctx = NULL; | ||
405 | GNUNET_CONFIGURATION_destroy (block_cfg); | ||
406 | block_cfg = NULL; | ||
407 | GNUNET_STATISTICS_destroy (GSF_stats, GNUNET_NO); | ||
408 | if (GNUNET_SCHEDULER_NO_TASK != cover_age_task) | ||
409 | { | ||
410 | GNUNET_SCHEDULER_cancel (cover_age_task); | ||
411 | cover_age_task = GNUNET_SCHEDULER_NO_TASK; | ||
412 | } | ||
413 | GNUNET_FS_indexing_done (); | ||
414 | GNUNET_LOAD_value_free (datastore_get_load); | ||
415 | datastore_get_load = NULL; | ||
416 | GNUNET_LOAD_value_free (GSF_rt_entry_lifetime); | ||
417 | GSF_rt_entry_lifetime = NULL; | ||
418 | } | ||
419 | |||
420 | |||
421 | /** | ||
422 | * Function called for each pending request whenever a new | ||
423 | * peer connects, giving us a chance to decide about submitting | ||
424 | * the existing request to the new peer. | ||
425 | * | ||
426 | * @param cls the 'struct GSF_ConnectedPeer' of the new peer | ||
427 | * @param key query for the request | ||
428 | * @param pr handle to the pending request | ||
429 | * @return GNUNET_YES to continue to iterate | ||
430 | */ | ||
431 | static int | ||
432 | consider_peer_for_forwarding (void *cls, | ||
433 | const GNUNET_HashCode *key, | ||
434 | struct GSF_PendingRequest *pr) | ||
435 | { | ||
436 | struct GSF_ConnectedPeer *cp = cls; | ||
437 | |||
438 | GSF_plan_add_ (cp, pr); | ||
439 | return GNUNET_YES; | ||
440 | } | ||
441 | |||
442 | |||
443 | /** | ||
444 | * Method called whenever a given peer connects. | ||
445 | * | ||
446 | * @param cls closure, not used | ||
447 | * @param peer peer identity this notification is about | ||
448 | * @param atsi performance information | ||
449 | */ | ||
450 | static void | ||
451 | peer_connect_handler (void *cls, | ||
452 | const struct GNUNET_PeerIdentity *peer, | ||
453 | const struct GNUNET_TRANSPORT_ATS_Information *atsi) | ||
454 | { | ||
455 | struct GSF_ConnectedPeer *cp; | ||
456 | |||
457 | if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity))) | ||
458 | return; | ||
459 | cp = GSF_peer_connect_handler_ (peer, atsi); | ||
460 | if (NULL == cp) | ||
461 | return; | ||
462 | GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, | ||
463 | cp); | ||
464 | } | ||
465 | |||
466 | |||
467 | /** | ||
468 | * Function called after GNUNET_CORE_connect has succeeded | ||
469 | * (or failed for good). Note that the private key of the | ||
470 | * peer is intentionally not exposed here; if you need it, | ||
471 | * your process should try to read the private key file | ||
472 | * directly (which should work if you are authorized...). | ||
473 | * | ||
474 | * @param cls closure | ||
475 | * @param server handle to the server, NULL if we failed | ||
476 | * @param my_identity ID of this peer, NULL if we failed | ||
477 | * @param publicKey public key of this peer, NULL if we failed | ||
478 | */ | ||
479 | static void | ||
480 | peer_init_handler (void *cls, | ||
481 | struct GNUNET_CORE_Handle * server, | ||
482 | const struct GNUNET_PeerIdentity * | ||
483 | my_identity, | ||
484 | const struct | ||
485 | GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded * | ||
486 | publicKey) | ||
487 | { | ||
488 | my_id = *my_identity; | ||
489 | } | ||
490 | |||
491 | |||
492 | /** | ||
493 | * Process fs requests. | ||
494 | * | ||
495 | * @param server the initialized server | ||
496 | * @param c configuration to use | ||
497 | */ | ||
498 | static int | ||
499 | main_init (struct GNUNET_SERVER_Handle *server, | ||
500 | const struct GNUNET_CONFIGURATION_Handle *c) | ||
501 | { | ||
502 | static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = | ||
503 | { | ||
504 | { &handle_p2p_get, | ||
505 | GNUNET_MESSAGE_TYPE_FS_GET, 0 }, | ||
506 | { &handle_p2p_put, | ||
507 | GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, | ||
508 | { &GSF_handle_p2p_migration_stop_, | ||
509 | GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, | ||
510 | sizeof (struct MigrationStopMessage) }, | ||
511 | { NULL, 0, 0 } | ||
512 | }; | ||
513 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { | ||
514 | {&GNUNET_FS_handle_index_start, NULL, | ||
515 | GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0}, | ||
516 | {&GNUNET_FS_handle_index_list_get, NULL, | ||
517 | GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) }, | ||
518 | {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, | ||
519 | sizeof (struct UnindexMessage) }, | ||
520 | {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, | ||
521 | 0 }, | ||
522 | {NULL, NULL, 0, 0} | ||
523 | }; | ||
524 | |||
525 | GSF_core = GNUNET_CORE_connect (GSF_cfg, | ||
526 | 2, /* larger? */ | ||
527 | NULL, | ||
528 | &peer_init_handler, | ||
529 | &peer_connect_handler, | ||
530 | &GSF_peer_disconnect_handler_, | ||
531 | &GSF_peer_status_handler_, | ||
532 | NULL, GNUNET_NO, | ||
533 | NULL, GNUNET_NO, | ||
534 | p2p_handlers); | ||
535 | if (NULL == GSF_core) | ||
536 | { | ||
537 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
538 | _("Failed to connect to `%s' service.\n"), | ||
539 | "core"); | ||
540 | return GNUNET_SYSERR; | ||
541 | } | ||
542 | GNUNET_SERVER_disconnect_notify (server, | ||
543 | &GSF_client_disconnect_handler_, | ||
544 | NULL); | ||
545 | GNUNET_SERVER_add_handlers (server, handlers); | ||
546 | cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, | ||
547 | &age_cover_counters, | ||
548 | NULL); | ||
549 | datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); | ||
550 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, | ||
551 | &shutdown_task, | ||
552 | NULL); | ||
553 | return GNUNET_OK; | ||
554 | } | ||
555 | |||
556 | |||
557 | /** | ||
558 | * Process fs requests. | ||
559 | * | ||
560 | * @param cls closure | ||
561 | * @param server the initialized server | ||
562 | * @param cfg configuration to use | ||
563 | */ | ||
564 | static void | ||
565 | run (void *cls, | ||
566 | struct GNUNET_SERVER_Handle *server, | ||
567 | const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
568 | { | ||
569 | GSF_cfg = cfg; | ||
570 | GSF_dsh = GNUNET_DATASTORE_connect (cfg); | ||
571 | if (NULL == GSF_dsh) | ||
572 | { | ||
573 | GNUNET_SCHEDULER_shutdown (); | ||
574 | return; | ||
575 | } | ||
576 | GSF_rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL); | ||
577 | GSF_stats = GNUNET_STATISTICS_create ("fs", cfg); | ||
578 | block_cfg = GNUNET_CONFIGURATION_create (); | ||
579 | GNUNET_CONFIGURATION_set_value_string (block_cfg, | ||
580 | "block", | ||
581 | "PLUGINS", | ||
582 | "fs"); | ||
583 | GSF_block_ctx = GNUNET_BLOCK_context_create (block_cfg); | ||
584 | GNUNET_assert (NULL != GSF_block_ctx); | ||
585 | GSF_dht = GNUNET_DHT_connect (cfg, | ||
586 | FS_DHT_HT_SIZE); | ||
587 | GSF_plan_init (); | ||
588 | GSF_pending_request_init_ (); | ||
589 | GSF_connected_peer_init_ (); | ||
590 | GSF_push_init_ (); | ||
591 | GSF_put_init_ (); | ||
592 | if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, GSF_dsh)) || | ||
593 | |||
594 | (GNUNET_OK != main_init (server, cfg)) ) | ||
595 | { | ||
596 | GNUNET_SCHEDULER_shutdown (); | ||
597 | shutdown_task (NULL, NULL); | ||
598 | return; | ||
599 | } | ||
600 | } | ||
601 | |||
602 | |||
603 | /** | ||
604 | * The main function for the fs service. | ||
605 | * | ||
606 | * @param argc number of arguments from the command line | ||
607 | * @param argv command line arguments | ||
608 | * @return 0 ok, 1 on error | ||
609 | */ | ||
610 | int | ||
611 | main (int argc, char *const *argv) | ||
612 | { | ||
613 | return (GNUNET_OK == | ||
614 | GNUNET_SERVICE_run (argc, | ||
615 | argv, | ||
616 | "fs", | ||
617 | GNUNET_SERVICE_OPTION_NONE, | ||
618 | &run, NULL)) ? 0 : 1; | ||
619 | } | ||
620 | |||
621 | /* end of gnunet-service-fs.c */ | ||