diff options
Diffstat (limited to 'src/fs/gnunet-service-fs_push.c')
-rw-r--r-- | src/fs/gnunet-service-fs_push.c | 530 |
1 files changed, 348 insertions, 182 deletions
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index 9f515e2ee..2180a520d 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c | |||
@@ -27,9 +27,6 @@ | |||
27 | #include "platform.h" | 27 | #include "platform.h" |
28 | #include "gnunet-service-fs_push.h" | 28 | #include "gnunet-service-fs_push.h" |
29 | 29 | ||
30 | |||
31 | /* FIXME: below are only old code fragments to use... */ | ||
32 | |||
33 | /** | 30 | /** |
34 | * Block that is ready for migration to other peers. Actual data is at the end of the block. | 31 | * Block that is ready for migration to other peers. Actual data is at the end of the block. |
35 | */ | 32 | */ |
@@ -57,7 +54,7 @@ struct MigrationReadyBlock | |||
57 | struct GNUNET_TIME_Absolute expiration; | 54 | struct GNUNET_TIME_Absolute expiration; |
58 | 55 | ||
59 | /** | 56 | /** |
60 | * Peers we would consider forwarding this | 57 | * Peers we already forwarded this |
61 | * block to. Zero for empty entries. | 58 | * block to. Zero for empty entries. |
62 | */ | 59 | */ |
63 | GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; | 60 | GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; |
@@ -80,6 +77,40 @@ struct MigrationReadyBlock | |||
80 | 77 | ||
81 | 78 | ||
82 | /** | 79 | /** |
80 | * Information about a peer waiting for | ||
81 | * migratable data. | ||
82 | */ | ||
83 | struct MigrationReadyPeer | ||
84 | { | ||
85 | /** | ||
86 | * This is a doubly-linked list. | ||
87 | */ | ||
88 | struct MigrationReadyPeer *next; | ||
89 | |||
90 | /** | ||
91 | * This is a doubly-linked list. | ||
92 | */ | ||
93 | struct MigrationReadyPeer *prev; | ||
94 | |||
95 | /** | ||
96 | * Handle to peer. | ||
97 | */ | ||
98 | struct GSF_ConnectedPeer *peer; | ||
99 | |||
100 | /** | ||
101 | * Handle for current transmission request, | ||
102 | * or NULL for none. | ||
103 | */ | ||
104 | struct GSF_PeerTransmitHandle *th; | ||
105 | |||
106 | /** | ||
107 | * Message we are trying to push right now (or NULL) | ||
108 | */ | ||
109 | struct PutMessage *msg; | ||
110 | }; | ||
111 | |||
112 | |||
113 | /** | ||
83 | * Head of linked list of blocks that can be migrated. | 114 | * Head of linked list of blocks that can be migrated. |
84 | */ | 115 | */ |
85 | static struct MigrationReadyBlock *mig_head; | 116 | static struct MigrationReadyBlock *mig_head; |
@@ -90,6 +121,16 @@ static struct MigrationReadyBlock *mig_head; | |||
90 | static struct MigrationReadyBlock *mig_tail; | 121 | static struct MigrationReadyBlock *mig_tail; |
91 | 122 | ||
92 | /** | 123 | /** |
124 | * Head of linked list of peers. | ||
125 | */ | ||
126 | static struct MigrationReadyPeer *peer_head; | ||
127 | |||
128 | /** | ||
129 | * Tail of linked list of peers. | ||
130 | */ | ||
131 | static struct MigrationReadyPeer *peer_tail; | ||
132 | |||
133 | /** | ||
93 | * Request to datastore for migration (or NULL). | 134 | * Request to datastore for migration (or NULL). |
94 | */ | 135 | */ |
95 | static struct GNUNET_DATASTORE_QueueEntry *mig_qe; | 136 | static struct GNUNET_DATASTORE_QueueEntry *mig_qe; |
@@ -106,14 +147,14 @@ static GNUNET_SCHEDULER_TaskIdentifier mig_task; | |||
106 | static struct GNUNET_TIME_Relative min_migration_delay; | 147 | static struct GNUNET_TIME_Relative min_migration_delay; |
107 | 148 | ||
108 | /** | 149 | /** |
109 | * Are we allowed to push out content from this peer. | 150 | * Size of the doubly-linked list of migration blocks. |
110 | */ | 151 | */ |
111 | static int active_from_migration; | 152 | static unsigned int mig_size; |
112 | 153 | ||
113 | /** | 154 | /** |
114 | * Size of the doubly-linked list of migration blocks. | 155 | * Is this module enabled? |
115 | */ | 156 | */ |
116 | static unsigned int mig_size; | 157 | static int enabled; |
117 | 158 | ||
118 | 159 | ||
119 | /** | 160 | /** |
@@ -135,134 +176,212 @@ delete_migration_block (struct MigrationReadyBlock *mb) | |||
135 | 176 | ||
136 | 177 | ||
137 | /** | 178 | /** |
138 | * Compare the distance of two peers to a key. | 179 | * Find content for migration to this peer. |
180 | */ | ||
181 | static void | ||
182 | find_content (struct MigrationReadyPeer *mrp); | ||
183 | |||
184 | |||
185 | /** | ||
186 | * Transmit the message currently scheduled for | ||
187 | * transmission. | ||
139 | * | 188 | * |
140 | * @param key key | 189 | * @param cls the 'struct MigrationReadyPeer' |
141 | * @param p1 first peer | 190 | * @param buf_size number of bytes available in buf |
142 | * @param p2 second peer | 191 | * @param buf where to copy the message, NULL on error (peer disconnect) |
143 | * @return GNUNET_YES if P1 is closer to key than P2 | 192 | * @return number of bytes copied to 'buf', can be 0 (without indicating an error) |
144 | */ | 193 | */ |
145 | static int | 194 | static size_t |
146 | is_closer (const GNUNET_HashCode *key, | 195 | transmit_message (void *cls, |
147 | const struct GNUNET_PeerIdentity *p1, | 196 | size_t buf_size, |
148 | const struct GNUNET_PeerIdentity *p2) | 197 | void *buf) |
149 | { | 198 | { |
150 | return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, | 199 | struct MigrationReadyPeer *peer = cls; |
151 | &p2->hashPubKey, | 200 | struct PutMessage *msg; |
152 | key); | 201 | uint16_t msize; |
202 | |||
203 | peer->th = NULL; | ||
204 | msg = peer->msg; | ||
205 | peer->msg = NULL; | ||
206 | if (buf == NULL) | ||
207 | { | ||
208 | GNUNET_free (msg); | ||
209 | return 0; | ||
210 | } | ||
211 | msize = ntohs (msg->header.size); | ||
212 | GNUNET_assert (msize <= buf_size); | ||
213 | memcpy (buf, msg, msize); | ||
214 | GNUNET_free (msg); | ||
215 | find_content (peer); | ||
216 | return msize; | ||
153 | } | 217 | } |
154 | 218 | ||
155 | 219 | ||
156 | /** | 220 | /** |
157 | * Consider migrating content to a given peer. | 221 | * Send the given block to the given peer. |
158 | * | 222 | * |
159 | * @param cls 'struct MigrationReadyBlock*' to select | 223 | * @param peer target peer |
160 | * targets for (or NULL for none) | 224 | * @param block the block |
161 | * @param key ID of the peer | 225 | * @return GNUNET_YES if the block was deleted (!) |
162 | * @param value 'struct ConnectedPeer' of the peer | ||
163 | * @return GNUNET_YES (always continue iteration) | ||
164 | */ | 226 | */ |
165 | static int | 227 | static int |
166 | consider_migration (void *cls, | 228 | transmit_content (struct MigrationReadyPeer *peer, |
167 | const GNUNET_HashCode *key, | 229 | struct MigrationReadyBlock *block) |
168 | void *value) | ||
169 | { | 230 | { |
170 | struct MigrationReadyBlock *mb = cls; | ||
171 | struct ConnectedPeer *cp = value; | ||
172 | struct MigrationReadyBlock *pos; | ||
173 | struct GNUNET_PeerIdentity cppid; | ||
174 | struct GNUNET_PeerIdentity otherpid; | ||
175 | struct GNUNET_PeerIdentity worstpid; | ||
176 | size_t msize; | 231 | size_t msize; |
177 | unsigned int i; | 232 | struct PutMessage *msg; |
178 | unsigned int repl; | 233 | unsigned int i; |
234 | struct GSF_PeerPerformanceData *ppd; | ||
235 | int ret; | ||
236 | |||
237 | ppd = GSF_get_peer_performance_data (peer->peer); | ||
238 | GNUNET_assert (NULL == peer->th); | ||
239 | msize = sizeof (struct PutMessage) + block->size; | ||
240 | msg = GNUNET_malloc (msize); | ||
241 | msg->header.type = htons (42); | ||
242 | msg->header.size = htons (msize); | ||
179 | 243 | ||
180 | /* consider 'cp' as a migration target for mb */ | 244 | memcpy (&msg[1], |
181 | if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0) | 245 | &block[1], |
182 | return GNUNET_YES; /* peer has requested no migration! */ | 246 | block->size); |
183 | if (mb != NULL) | 247 | peer->msg = msg; |
248 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | ||
184 | { | 249 | { |
185 | GNUNET_PEER_resolve (cp->pid, | 250 | if (block->target_list[i] == 0) |
186 | &cppid); | ||
187 | repl = MIGRATION_LIST_SIZE; | ||
188 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | ||
189 | { | 251 | { |
190 | if (mb->target_list[i] == 0) | 252 | block->target_list[i] = ppd->pid; |
191 | { | 253 | GNUNET_PEER_change_rc (block->target_list[i], 1); |
192 | mb->target_list[i] = cp->pid; | 254 | break; |
193 | GNUNET_PEER_change_rc (mb->target_list[i], 1); | ||
194 | repl = MIGRATION_LIST_SIZE; | ||
195 | break; | ||
196 | } | ||
197 | GNUNET_PEER_resolve (mb->target_list[i], | ||
198 | &otherpid); | ||
199 | if ( (repl == MIGRATION_LIST_SIZE) && | ||
200 | is_closer (&mb->query, | ||
201 | &cppid, | ||
202 | &otherpid)) | ||
203 | { | ||
204 | repl = i; | ||
205 | worstpid = otherpid; | ||
206 | } | ||
207 | else if ( (repl != MIGRATION_LIST_SIZE) && | ||
208 | (is_closer (&mb->query, | ||
209 | &worstpid, | ||
210 | &otherpid) ) ) | ||
211 | { | ||
212 | repl = i; | ||
213 | worstpid = otherpid; | ||
214 | } | ||
215 | } | ||
216 | if (repl != MIGRATION_LIST_SIZE) | ||
217 | { | ||
218 | GNUNET_PEER_change_rc (mb->target_list[repl], -1); | ||
219 | mb->target_list[repl] = cp->pid; | ||
220 | GNUNET_PEER_change_rc (mb->target_list[repl], 1); | ||
221 | } | 255 | } |
222 | } | 256 | } |
257 | if (MIGRATION_LIST_SIZE == i) | ||
258 | { | ||
259 | delete_migration_block (block); | ||
260 | ret = GNUNET_YES; | ||
261 | } | ||
262 | else | ||
263 | { | ||
264 | ret = GNUNET_NO; | ||
265 | } | ||
266 | peer->th = GSF_peer_transmit_ (peer->peer, | ||
267 | GNUNET_NO, | ||
268 | 0 /* priority */, | ||
269 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
270 | msize, | ||
271 | &transmit_message, | ||
272 | peer); | ||
273 | return ret; | ||
274 | } | ||
223 | 275 | ||
224 | /* consider scheduling transmission to cp for content migration */ | 276 | |
225 | if (cp->cth != NULL) | 277 | /** |
226 | return GNUNET_YES; | 278 | * Count the number of peers this block has |
227 | msize = 0; | 279 | * already been forwarded to. |
228 | pos = mig_head; | 280 | * |
229 | while (pos != NULL) | 281 | * @param block the block |
282 | * @return number of times block was forwarded | ||
283 | */ | ||
284 | static unsigned int | ||
285 | count_targets (struct MigrationReadyBlock *block) | ||
286 | { | ||
287 | unsigned int i; | ||
288 | |||
289 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | ||
290 | if (block->target_list[i] == 0) | ||
291 | return i; | ||
292 | return i; | ||
293 | } | ||
294 | |||
295 | |||
296 | /** | ||
297 | * Check if sending this block to this peer would | ||
298 | * be a good idea. | ||
299 | * | ||
300 | * @param peer target peer | ||
301 | * @param block the block | ||
302 | * @return score (>= 0: feasible, negative: infeasible) | ||
303 | */ | ||
304 | static long | ||
305 | score_content (struct MigrationReadyPeer *peer, | ||
306 | struct MigrationReadyBlock *block) | ||
307 | { | ||
308 | unsigned int i; | ||
309 | struct GSF_PeerPerformanceData *ppd; | ||
310 | struct GNUNET_PeerIdentity id; | ||
311 | uint32_t dist; | ||
312 | |||
313 | ppd = GSF_get_peer_performance_data (peer->peer); | ||
314 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | ||
315 | if (mb->target_list[i] == ppd->pid) | ||
316 | return -1; | ||
317 | GSF_connected_peer_get_identity (peer->peer, | ||
318 | &id); | ||
319 | dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, | ||
320 | &id.hashPubKey); | ||
321 | /* closer distance, higher score: */ | ||
322 | return UINT32_MAX - dist; | ||
323 | } | ||
324 | |||
325 | |||
326 | /** | ||
327 | * If the migration task is not currently running, consider | ||
328 | * (re)scheduling it with the appropriate delay. | ||
329 | */ | ||
330 | static void | ||
331 | consider_gathering (void); | ||
332 | |||
333 | |||
334 | /** | ||
335 | * Find content for migration to this peer. | ||
336 | * | ||
337 | * @param mig peer to find content for | ||
338 | */ | ||
339 | static void | ||
340 | find_content (struct MigrationReadyPeer *mrp) | ||
341 | { | ||
342 | struct MigrationReadyBlock *pos; | ||
343 | long score; | ||
344 | long best_score; | ||
345 | struct MigrationReadyBlock *best; | ||
346 | |||
347 | GNUNET_assert (NULL == mrp->th); | ||
348 | best = NULL; | ||
349 | best_score = -1; | ||
350 | pos = mig_qe; | ||
351 | while (NULL != pos) | ||
230 | { | 352 | { |
231 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | 353 | score = score_content (mrp, pos); |
354 | if (score > best_score) | ||
232 | { | 355 | { |
233 | if (cp->pid == pos->target_list[i]) | 356 | best_score = score; |
234 | { | 357 | best = mrp; |
235 | if (msize == 0) | ||
236 | msize = pos->size; | ||
237 | else | ||
238 | msize = GNUNET_MIN (msize, | ||
239 | pos->size); | ||
240 | break; | ||
241 | } | ||
242 | } | 358 | } |
243 | pos = pos->next; | 359 | pos = pos->next; |
244 | } | 360 | } |
245 | if (msize == 0) | 361 | if ( (NULL == best) && |
246 | return GNUNET_YES; /* no content available */ | 362 | (mig_size >= MAX_MIGRATION_QUEUE) ) |
247 | #if DEBUG_FS | ||
248 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
249 | "Trying to migrate at least %u bytes to peer `%s'\n", | ||
250 | msize, | ||
251 | GNUNET_h2s (key)); | ||
252 | #endif | ||
253 | if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) | ||
254 | { | 363 | { |
255 | GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); | 364 | /* failed to find migration target AND |
256 | cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; | 365 | queue is full, purge most-forwarded |
366 | block from queue to make room for more */ | ||
367 | score = 0; | ||
368 | pos = mig_qe; | ||
369 | while (NULL != pos) | ||
370 | { | ||
371 | score = count_targets (pos); | ||
372 | if (score >= best_score) | ||
373 | { | ||
374 | best_score = score; | ||
375 | best = mrp; | ||
376 | } | ||
377 | pos = pos->next; | ||
378 | } | ||
379 | GNUNET_assert (NULL != best); | ||
380 | delete_migration_block (best); | ||
381 | consider_gathering (); | ||
382 | return; | ||
257 | } | 383 | } |
258 | cp->cth | 384 | transmit_content (peer, best); |
259 | = GNUNET_CORE_notify_transmit_ready (core, | ||
260 | 0, GNUNET_TIME_UNIT_FOREVER_REL, | ||
261 | (const struct GNUNET_PeerIdentity*) key, | ||
262 | msize + sizeof (struct PutMessage), | ||
263 | &transmit_to_peer, | ||
264 | cp); | ||
265 | return GNUNET_YES; | ||
266 | } | 385 | } |
267 | 386 | ||
268 | 387 | ||
@@ -278,23 +397,23 @@ gather_migration_blocks (void *cls, | |||
278 | const struct GNUNET_SCHEDULER_TaskContext *tc); | 397 | const struct GNUNET_SCHEDULER_TaskContext *tc); |
279 | 398 | ||
280 | 399 | ||
281 | |||
282 | |||
283 | /** | 400 | /** |
284 | * If the migration task is not currently running, consider | 401 | * If the migration task is not currently running, consider |
285 | * (re)scheduling it with the appropriate delay. | 402 | * (re)scheduling it with the appropriate delay. |
286 | */ | 403 | */ |
287 | static void | 404 | static void |
288 | consider_migration_gathering () | 405 | consider_gathering () |
289 | { | 406 | { |
290 | struct GNUNET_TIME_Relative delay; | 407 | struct GNUNET_TIME_Relative delay; |
291 | 408 | ||
292 | if (dsh == NULL) | 409 | if (GSF_dsh == NULL) |
293 | return; | 410 | return; |
294 | if (mig_qe != NULL) | 411 | if (mig_qe != NULL) |
295 | return; | 412 | return; |
296 | if (mig_task != GNUNET_SCHEDULER_NO_TASK) | 413 | if (mig_task != GNUNET_SCHEDULER_NO_TASK) |
297 | return; | 414 | return; |
415 | if (mig_size >= MAX_MIGRATION_QUEUE) | ||
416 | return; | ||
298 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, | 417 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
299 | mig_size); | 418 | mig_size); |
300 | delay = GNUNET_TIME_relative_divide (delay, | 419 | delay = GNUNET_TIME_relative_divide (delay, |
@@ -307,8 +426,6 @@ consider_migration_gathering () | |||
307 | } | 426 | } |
308 | 427 | ||
309 | 428 | ||
310 | |||
311 | |||
312 | /** | 429 | /** |
313 | * Process content offered for migration. | 430 | * Process content offered for migration. |
314 | * | 431 | * |
@@ -335,19 +452,19 @@ process_migration_content (void *cls, | |||
335 | expiration, uint64_t uid) | 452 | expiration, uint64_t uid) |
336 | { | 453 | { |
337 | struct MigrationReadyBlock *mb; | 454 | struct MigrationReadyBlock *mb; |
455 | struct MigrationReadyPeer *pos; | ||
338 | 456 | ||
339 | if (key == NULL) | 457 | if (key == NULL) |
340 | { | 458 | { |
341 | mig_qe = NULL; | 459 | mig_qe = NULL; |
342 | if (mig_size < MAX_MIGRATION_QUEUE) | 460 | consider_gathering (); |
343 | consider_migration_gathering (); | ||
344 | return; | 461 | return; |
345 | } | 462 | } |
346 | if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < | 463 | if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < |
347 | MIN_MIGRATION_CONTENT_LIFETIME.rel_value) | 464 | MIN_MIGRATION_CONTENT_LIFETIME.rel_value) |
348 | { | 465 | { |
349 | /* content will expire soon, don't bother */ | 466 | /* content will expire soon, don't bother */ |
350 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | 467 | GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); |
351 | return; | 468 | return; |
352 | } | 469 | } |
353 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | 470 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) |
@@ -359,7 +476,7 @@ process_migration_content (void *cls, | |||
359 | &process_migration_content, | 476 | &process_migration_content, |
360 | NULL)) | 477 | NULL)) |
361 | { | 478 | { |
362 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | 479 | GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); |
363 | } | 480 | } |
364 | return; | 481 | return; |
365 | } | 482 | } |
@@ -380,14 +497,20 @@ process_migration_content (void *cls, | |||
380 | mig_tail, | 497 | mig_tail, |
381 | mb); | 498 | mb); |
382 | mig_size++; | 499 | mig_size++; |
383 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | 500 | pos = peer_head; |
384 | &consider_migration, | 501 | while (pos != NULL) |
385 | mb); | 502 | { |
386 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | 503 | if (NULL == pos->th) |
504 | { | ||
505 | if (GNUNET_YES == transmit_content (pos, mb)) | ||
506 | break; /* 'mb' was freed! */ | ||
507 | } | ||
508 | pos = pos->next; | ||
509 | } | ||
510 | GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); | ||
387 | } | 511 | } |
388 | 512 | ||
389 | 513 | ||
390 | |||
391 | /** | 514 | /** |
392 | * Task that is run periodically to obtain blocks for content | 515 | * Task that is run periodically to obtain blocks for content |
393 | * migration | 516 | * migration |
@@ -400,9 +523,10 @@ gather_migration_blocks (void *cls, | |||
400 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 523 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
401 | { | 524 | { |
402 | mig_task = GNUNET_SCHEDULER_NO_TASK; | 525 | mig_task = GNUNET_SCHEDULER_NO_TASK; |
403 | if (dsh != NULL) | 526 | if (GSF_dsh != NULL) |
404 | { | 527 | { |
405 | mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX, | 528 | mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh, |
529 | 0, UINT_MAX, | ||
406 | GNUNET_TIME_UNIT_FOREVER_REL, | 530 | GNUNET_TIME_UNIT_FOREVER_REL, |
407 | &process_migration_content, NULL); | 531 | &process_migration_content, NULL); |
408 | GNUNET_assert (mig_qe != NULL); | 532 | GNUNET_assert (mig_qe != NULL); |
@@ -410,66 +534,108 @@ gather_migration_blocks (void *cls, | |||
410 | } | 534 | } |
411 | 535 | ||
412 | 536 | ||
537 | /** | ||
538 | * A peer connected to us. Start pushing content | ||
539 | * to this peer. | ||
540 | * | ||
541 | * @param peer handle for the peer that connected | ||
542 | */ | ||
543 | void | ||
544 | GSF_push_start_ (struct GSF_ConnectedPeer *peer) | ||
545 | { | ||
546 | struct MigrationReadyPeer *mrp; | ||
413 | 547 | ||
414 | size_t | 548 | if (GNUNET_YES != enabled) |
415 | API_ (void *cls, | 549 | return; |
416 | size_t size, void *buf) | 550 | mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer)); |
551 | mrp->peer = peer; | ||
552 | find_content (mrp); | ||
553 | GNUNET_CONTAINER_DLL_insert (peer_head, | ||
554 | peer_tail, | ||
555 | mrp); | ||
556 | } | ||
557 | |||
558 | |||
559 | /** | ||
560 | * A peer disconnected from us. Stop pushing content | ||
561 | * to this peer. | ||
562 | * | ||
563 | * @param peer handle for the peer that disconnected | ||
564 | */ | ||
565 | void | ||
566 | GSF_push_stop_ (struct GSF_ConnectedPeer *peer) | ||
417 | { | 567 | { |
418 | next = mig_head; | 568 | struct MigrationReadyPeer *pos; |
419 | while (NULL != (mb = next)) | 569 | |
570 | pos = peer_head; | ||
571 | while (pos != NULL) | ||
572 | { | ||
573 | if (pos->peer == peer) | ||
420 | { | 574 | { |
421 | next = mb->next; | 575 | GNUNET_CONTAINER_DLL_remove (peer_head, |
422 | for (i=0;i<MIGRATION_LIST_SIZE;i++) | 576 | peer_tail, |
423 | { | 577 | pos); |
424 | if ( (cp->pid == mb->target_list[i]) && | 578 | if (NULL != pos->th) |
425 | (mb->size + sizeof (migm) <= size) ) | 579 | GSF_peer_transmit_cancel_ (pos->th); |
426 | { | 580 | GNUNET_free (pos); |
427 | GNUNET_PEER_change_rc (mb->target_list[i], -1); | 581 | return; |
428 | mb->target_list[i] = 0; | ||
429 | mb->used_targets++; | ||
430 | memset (&migm, 0, sizeof (migm)); | ||
431 | migm.header.size = htons (sizeof (migm) + mb->size); | ||
432 | migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | ||
433 | migm.type = htonl (mb->type); | ||
434 | migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration); | ||
435 | memcpy (&cbuf[msize], &migm, sizeof (migm)); | ||
436 | msize += sizeof (migm); | ||
437 | size -= sizeof (migm); | ||
438 | memcpy (&cbuf[msize], &mb[1], mb->size); | ||
439 | msize += mb->size; | ||
440 | size -= mb->size; | ||
441 | #if DEBUG_FS | ||
442 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
443 | "Pushing migration block `%s' (%u bytes) to `%s'\n", | ||
444 | GNUNET_h2s (&mb->query), | ||
445 | (unsigned int) mb->size, | ||
446 | GNUNET_i2s (&pid)); | ||
447 | #endif | ||
448 | break; | ||
449 | } | ||
450 | else | ||
451 | { | ||
452 | #if DEBUG_FS | ||
453 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
454 | "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n", | ||
455 | GNUNET_h2s (&mb->query), | ||
456 | (unsigned int) mb->size, | ||
457 | GNUNET_i2s (&pid)); | ||
458 | #endif | ||
459 | } | ||
460 | } | ||
461 | if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) || | ||
462 | (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) ) | ||
463 | { | ||
464 | delete_migration_block (mb); | ||
465 | consider_migration_gathering (); | ||
466 | } | ||
467 | } | 582 | } |
468 | consider_migration (NULL, | 583 | pos = pos->next; |
469 | &pid.hashPubKey, | 584 | } |
470 | cp); | 585 | } |
586 | |||
471 | 587 | ||
588 | /** | ||
589 | * Setup the module. | ||
590 | * | ||
591 | * @param cfg configuration to use | ||
592 | */ | ||
593 | void | ||
594 | GSF_push_init_ (struct GNUNET_CONFIGURATION_Handle *cfg) | ||
595 | { | ||
596 | int enabled; | ||
597 | |||
598 | enabled = GNUNET_CONFIGURATION_get_value_yesno (cfg, | ||
599 | "FS", | ||
600 | "CONTENT_PUSHING"); | ||
601 | if (GNUNET_YES != enabled) | ||
602 | return; | ||
603 | |||
604 | if (GNUNET_OK != | ||
605 | GNUNET_CONFIGURATION_get_value_time (cfg, | ||
606 | "fs", | ||
607 | "MIN_MIGRATION_DELAY", | ||
608 | &min_migration_delay)) | ||
609 | { | ||
610 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
611 | _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"), | ||
612 | "MIN_MIGRATION_DELAY", | ||
613 | "fs"); | ||
614 | return; | ||
615 | } | ||
616 | consider_gathering (); | ||
472 | } | 617 | } |
473 | 618 | ||
474 | 619 | ||
620 | /** | ||
621 | * Shutdown the module. | ||
622 | */ | ||
623 | void | ||
624 | GSF_push_done_ () | ||
625 | { | ||
626 | if (GNUNET_SCHEDULER_NO_TASK != mig_task) | ||
627 | { | ||
628 | GNUNET_SCHEDULER_cancel (mig_task); | ||
629 | mig_task = GNUNET_SCHEDULER_NO_TASK; | ||
630 | } | ||
631 | if (NULL != mig_qe) | ||
632 | { | ||
633 | GNUNET_DATASTORE_cancel (mig_qe); | ||
634 | mig_qe = NULL; | ||
635 | } | ||
636 | while (NULL != mig_head) | ||
637 | delete_migration_block (mig_head); | ||
638 | GNUNET_assert (0 == mig_size); | ||
639 | } | ||
475 | 640 | ||
641 | /* end of gnunet-service-fs_push.c */ | ||