aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_push.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_push.c')
-rw-r--r--src/fs/gnunet-service-fs_push.c530
1 files changed, 348 insertions, 182 deletions
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
index 9f515e2ee..2180a520d 100644
--- a/src/fs/gnunet-service-fs_push.c
+++ b/src/fs/gnunet-service-fs_push.c
@@ -27,9 +27,6 @@
27#include "platform.h" 27#include "platform.h"
28#include "gnunet-service-fs_push.h" 28#include "gnunet-service-fs_push.h"
29 29
30
31/* FIXME: below are only old code fragments to use... */
32
33/** 30/**
34 * Block that is ready for migration to other peers. Actual data is at the end of the block. 31 * Block that is ready for migration to other peers. Actual data is at the end of the block.
35 */ 32 */
@@ -57,7 +54,7 @@ struct MigrationReadyBlock
57 struct GNUNET_TIME_Absolute expiration; 54 struct GNUNET_TIME_Absolute expiration;
58 55
59 /** 56 /**
60 * Peers we would consider forwarding this 57 * Peers we already forwarded this
61 * block to. Zero for empty entries. 58 * block to. Zero for empty entries.
62 */ 59 */
63 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; 60 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
@@ -80,6 +77,40 @@ struct MigrationReadyBlock
80 77
81 78
82/** 79/**
80 * Information about a peer waiting for
81 * migratable data.
82 */
83struct MigrationReadyPeer
84{
85 /**
86 * This is a doubly-linked list.
87 */
88 struct MigrationReadyPeer *next;
89
90 /**
91 * This is a doubly-linked list.
92 */
93 struct MigrationReadyPeer *prev;
94
95 /**
96 * Handle to peer.
97 */
98 struct GSF_ConnectedPeer *peer;
99
100 /**
101 * Handle for current transmission request,
102 * or NULL for none.
103 */
104 struct GSF_PeerTransmitHandle *th;
105
106 /**
107 * Message we are trying to push right now (or NULL)
108 */
109 struct PutMessage *msg;
110};
111
112
113/**
83 * Head of linked list of blocks that can be migrated. 114 * Head of linked list of blocks that can be migrated.
84 */ 115 */
85static struct MigrationReadyBlock *mig_head; 116static struct MigrationReadyBlock *mig_head;
@@ -90,6 +121,16 @@ static struct MigrationReadyBlock *mig_head;
90static struct MigrationReadyBlock *mig_tail; 121static struct MigrationReadyBlock *mig_tail;
91 122
92/** 123/**
124 * Head of linked list of peers.
125 */
126static struct MigrationReadyPeer *peer_head;
127
128/**
129 * Tail of linked list of peers.
130 */
131static struct MigrationReadyPeer *peer_tail;
132
133/**
93 * Request to datastore for migration (or NULL). 134 * Request to datastore for migration (or NULL).
94 */ 135 */
95static struct GNUNET_DATASTORE_QueueEntry *mig_qe; 136static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
@@ -106,14 +147,14 @@ static GNUNET_SCHEDULER_TaskIdentifier mig_task;
106static struct GNUNET_TIME_Relative min_migration_delay; 147static struct GNUNET_TIME_Relative min_migration_delay;
107 148
108/** 149/**
109 * Are we allowed to push out content from this peer. 150 * Size of the doubly-linked list of migration blocks.
110 */ 151 */
111static int active_from_migration; 152static unsigned int mig_size;
112 153
113/** 154/**
114 * Size of the doubly-linked list of migration blocks. 155 * Is this module enabled?
115 */ 156 */
116static unsigned int mig_size; 157static int enabled;
117 158
118 159
119/** 160/**
@@ -135,134 +176,212 @@ delete_migration_block (struct MigrationReadyBlock *mb)
135 176
136 177
137/** 178/**
138 * Compare the distance of two peers to a key. 179 * Find content for migration to this peer.
180 */
181static void
182find_content (struct MigrationReadyPeer *mrp);
183
184
185/**
186 * Transmit the message currently scheduled for
187 * transmission.
139 * 188 *
140 * @param key key 189 * @param cls the 'struct MigrationReadyPeer'
141 * @param p1 first peer 190 * @param buf_size number of bytes available in buf
142 * @param p2 second peer 191 * @param buf where to copy the message, NULL on error (peer disconnect)
143 * @return GNUNET_YES if P1 is closer to key than P2 192 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
144 */ 193 */
145static int 194static size_t
146is_closer (const GNUNET_HashCode *key, 195transmit_message (void *cls,
147 const struct GNUNET_PeerIdentity *p1, 196 size_t buf_size,
148 const struct GNUNET_PeerIdentity *p2) 197 void *buf)
149{ 198{
150 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, 199 struct MigrationReadyPeer *peer = cls;
151 &p2->hashPubKey, 200 struct PutMessage *msg;
152 key); 201 uint16_t msize;
202
203 peer->th = NULL;
204 msg = peer->msg;
205 peer->msg = NULL;
206 if (buf == NULL)
207 {
208 GNUNET_free (msg);
209 return 0;
210 }
211 msize = ntohs (msg->header.size);
212 GNUNET_assert (msize <= buf_size);
213 memcpy (buf, msg, msize);
214 GNUNET_free (msg);
215 find_content (peer);
216 return msize;
153} 217}
154 218
155 219
156/** 220/**
157 * Consider migrating content to a given peer. 221 * Send the given block to the given peer.
158 * 222 *
159 * @param cls 'struct MigrationReadyBlock*' to select 223 * @param peer target peer
160 * targets for (or NULL for none) 224 * @param block the block
161 * @param key ID of the peer 225 * @return GNUNET_YES if the block was deleted (!)
162 * @param value 'struct ConnectedPeer' of the peer
163 * @return GNUNET_YES (always continue iteration)
164 */ 226 */
165static int 227static int
166consider_migration (void *cls, 228transmit_content (struct MigrationReadyPeer *peer,
167 const GNUNET_HashCode *key, 229 struct MigrationReadyBlock *block)
168 void *value)
169{ 230{
170 struct MigrationReadyBlock *mb = cls;
171 struct ConnectedPeer *cp = value;
172 struct MigrationReadyBlock *pos;
173 struct GNUNET_PeerIdentity cppid;
174 struct GNUNET_PeerIdentity otherpid;
175 struct GNUNET_PeerIdentity worstpid;
176 size_t msize; 231 size_t msize;
177 unsigned int i; 232 struct PutMessage *msg;
178 unsigned int repl; 233 unsigned int i;
234 struct GSF_PeerPerformanceData *ppd;
235 int ret;
236
237 ppd = GSF_get_peer_performance_data (peer->peer);
238 GNUNET_assert (NULL == peer->th);
239 msize = sizeof (struct PutMessage) + block->size;
240 msg = GNUNET_malloc (msize);
241 msg->header.type = htons (42);
242 msg->header.size = htons (msize);
179 243
180 /* consider 'cp' as a migration target for mb */ 244 memcpy (&msg[1],
181 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0) 245 &block[1],
182 return GNUNET_YES; /* peer has requested no migration! */ 246 block->size);
183 if (mb != NULL) 247 peer->msg = msg;
248 for (i=0;i<MIGRATION_LIST_SIZE;i++)
184 { 249 {
185 GNUNET_PEER_resolve (cp->pid, 250 if (block->target_list[i] == 0)
186 &cppid);
187 repl = MIGRATION_LIST_SIZE;
188 for (i=0;i<MIGRATION_LIST_SIZE;i++)
189 { 251 {
190 if (mb->target_list[i] == 0) 252 block->target_list[i] = ppd->pid;
191 { 253 GNUNET_PEER_change_rc (block->target_list[i], 1);
192 mb->target_list[i] = cp->pid; 254 break;
193 GNUNET_PEER_change_rc (mb->target_list[i], 1);
194 repl = MIGRATION_LIST_SIZE;
195 break;
196 }
197 GNUNET_PEER_resolve (mb->target_list[i],
198 &otherpid);
199 if ( (repl == MIGRATION_LIST_SIZE) &&
200 is_closer (&mb->query,
201 &cppid,
202 &otherpid))
203 {
204 repl = i;
205 worstpid = otherpid;
206 }
207 else if ( (repl != MIGRATION_LIST_SIZE) &&
208 (is_closer (&mb->query,
209 &worstpid,
210 &otherpid) ) )
211 {
212 repl = i;
213 worstpid = otherpid;
214 }
215 }
216 if (repl != MIGRATION_LIST_SIZE)
217 {
218 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
219 mb->target_list[repl] = cp->pid;
220 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
221 } 255 }
222 } 256 }
257 if (MIGRATION_LIST_SIZE == i)
258 {
259 delete_migration_block (block);
260 ret = GNUNET_YES;
261 }
262 else
263 {
264 ret = GNUNET_NO;
265 }
266 peer->th = GSF_peer_transmit_ (peer->peer,
267 GNUNET_NO,
268 0 /* priority */,
269 GNUNET_TIME_UNIT_FOREVER_REL,
270 msize,
271 &transmit_message,
272 peer);
273 return ret;
274}
223 275
224 /* consider scheduling transmission to cp for content migration */ 276
225 if (cp->cth != NULL) 277/**
226 return GNUNET_YES; 278 * Count the number of peers this block has
227 msize = 0; 279 * already been forwarded to.
228 pos = mig_head; 280 *
229 while (pos != NULL) 281 * @param block the block
282 * @return number of times block was forwarded
283 */
284static unsigned int
285count_targets (struct MigrationReadyBlock *block)
286{
287 unsigned int i;
288
289 for (i=0;i<MIGRATION_LIST_SIZE;i++)
290 if (block->target_list[i] == 0)
291 return i;
292 return i;
293}
294
295
296/**
297 * Check if sending this block to this peer would
298 * be a good idea.
299 *
300 * @param peer target peer
301 * @param block the block
302 * @return score (>= 0: feasible, negative: infeasible)
303 */
304static long
305score_content (struct MigrationReadyPeer *peer,
306 struct MigrationReadyBlock *block)
307{
308 unsigned int i;
309 struct GSF_PeerPerformanceData *ppd;
310 struct GNUNET_PeerIdentity id;
311 uint32_t dist;
312
313 ppd = GSF_get_peer_performance_data (peer->peer);
314 for (i=0;i<MIGRATION_LIST_SIZE;i++)
315 if (mb->target_list[i] == ppd->pid)
316 return -1;
317 GSF_connected_peer_get_identity (peer->peer,
318 &id);
319 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
320 &id.hashPubKey);
321 /* closer distance, higher score: */
322 return UINT32_MAX - dist;
323}
324
325
326/**
327 * If the migration task is not currently running, consider
328 * (re)scheduling it with the appropriate delay.
329 */
330static void
331consider_gathering (void);
332
333
334/**
335 * Find content for migration to this peer.
336 *
337 * @param mig peer to find content for
338 */
339static void
340find_content (struct MigrationReadyPeer *mrp)
341{
342 struct MigrationReadyBlock *pos;
343 long score;
344 long best_score;
345 struct MigrationReadyBlock *best;
346
347 GNUNET_assert (NULL == mrp->th);
348 best = NULL;
349 best_score = -1;
350 pos = mig_qe;
351 while (NULL != pos)
230 { 352 {
231 for (i=0;i<MIGRATION_LIST_SIZE;i++) 353 score = score_content (mrp, pos);
354 if (score > best_score)
232 { 355 {
233 if (cp->pid == pos->target_list[i]) 356 best_score = score;
234 { 357 best = mrp;
235 if (msize == 0)
236 msize = pos->size;
237 else
238 msize = GNUNET_MIN (msize,
239 pos->size);
240 break;
241 }
242 } 358 }
243 pos = pos->next; 359 pos = pos->next;
244 } 360 }
245 if (msize == 0) 361 if ( (NULL == best) &&
246 return GNUNET_YES; /* no content available */ 362 (mig_size >= MAX_MIGRATION_QUEUE) )
247#if DEBUG_FS
248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249 "Trying to migrate at least %u bytes to peer `%s'\n",
250 msize,
251 GNUNET_h2s (key));
252#endif
253 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
254 { 363 {
255 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); 364 /* failed to find migration target AND
256 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; 365 queue is full, purge most-forwarded
366 block from queue to make room for more */
367 score = 0;
368 pos = mig_qe;
369 while (NULL != pos)
370 {
371 score = count_targets (pos);
372 if (score >= best_score)
373 {
374 best_score = score;
375 best = mrp;
376 }
377 pos = pos->next;
378 }
379 GNUNET_assert (NULL != best);
380 delete_migration_block (best);
381 consider_gathering ();
382 return;
257 } 383 }
258 cp->cth 384 transmit_content (peer, best);
259 = GNUNET_CORE_notify_transmit_ready (core,
260 0, GNUNET_TIME_UNIT_FOREVER_REL,
261 (const struct GNUNET_PeerIdentity*) key,
262 msize + sizeof (struct PutMessage),
263 &transmit_to_peer,
264 cp);
265 return GNUNET_YES;
266} 385}
267 386
268 387
@@ -278,23 +397,23 @@ gather_migration_blocks (void *cls,
278 const struct GNUNET_SCHEDULER_TaskContext *tc); 397 const struct GNUNET_SCHEDULER_TaskContext *tc);
279 398
280 399
281
282
283/** 400/**
284 * If the migration task is not currently running, consider 401 * If the migration task is not currently running, consider
285 * (re)scheduling it with the appropriate delay. 402 * (re)scheduling it with the appropriate delay.
286 */ 403 */
287static void 404static void
288consider_migration_gathering () 405consider_gathering ()
289{ 406{
290 struct GNUNET_TIME_Relative delay; 407 struct GNUNET_TIME_Relative delay;
291 408
292 if (dsh == NULL) 409 if (GSF_dsh == NULL)
293 return; 410 return;
294 if (mig_qe != NULL) 411 if (mig_qe != NULL)
295 return; 412 return;
296 if (mig_task != GNUNET_SCHEDULER_NO_TASK) 413 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
297 return; 414 return;
415 if (mig_size >= MAX_MIGRATION_QUEUE)
416 return;
298 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 417 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
299 mig_size); 418 mig_size);
300 delay = GNUNET_TIME_relative_divide (delay, 419 delay = GNUNET_TIME_relative_divide (delay,
@@ -307,8 +426,6 @@ consider_migration_gathering ()
307} 426}
308 427
309 428
310
311
312/** 429/**
313 * Process content offered for migration. 430 * Process content offered for migration.
314 * 431 *
@@ -335,19 +452,19 @@ process_migration_content (void *cls,
335 expiration, uint64_t uid) 452 expiration, uint64_t uid)
336{ 453{
337 struct MigrationReadyBlock *mb; 454 struct MigrationReadyBlock *mb;
455 struct MigrationReadyPeer *pos;
338 456
339 if (key == NULL) 457 if (key == NULL)
340 { 458 {
341 mig_qe = NULL; 459 mig_qe = NULL;
342 if (mig_size < MAX_MIGRATION_QUEUE) 460 consider_gathering ();
343 consider_migration_gathering ();
344 return; 461 return;
345 } 462 }
346 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 463 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
347 MIN_MIGRATION_CONTENT_LIFETIME.rel_value) 464 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
348 { 465 {
349 /* content will expire soon, don't bother */ 466 /* content will expire soon, don't bother */
350 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 467 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
351 return; 468 return;
352 } 469 }
353 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 470 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
@@ -359,7 +476,7 @@ process_migration_content (void *cls,
359 &process_migration_content, 476 &process_migration_content,
360 NULL)) 477 NULL))
361 { 478 {
362 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 479 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
363 } 480 }
364 return; 481 return;
365 } 482 }
@@ -380,14 +497,20 @@ process_migration_content (void *cls,
380 mig_tail, 497 mig_tail,
381 mb); 498 mb);
382 mig_size++; 499 mig_size++;
383 GNUNET_CONTAINER_multihashmap_iterate (connected_peers, 500 pos = peer_head;
384 &consider_migration, 501 while (pos != NULL)
385 mb); 502 {
386 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 503 if (NULL == pos->th)
504 {
505 if (GNUNET_YES == transmit_content (pos, mb))
506 break; /* 'mb' was freed! */
507 }
508 pos = pos->next;
509 }
510 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
387} 511}
388 512
389 513
390
391/** 514/**
392 * Task that is run periodically to obtain blocks for content 515 * Task that is run periodically to obtain blocks for content
393 * migration 516 * migration
@@ -400,9 +523,10 @@ gather_migration_blocks (void *cls,
400 const struct GNUNET_SCHEDULER_TaskContext *tc) 523 const struct GNUNET_SCHEDULER_TaskContext *tc)
401{ 524{
402 mig_task = GNUNET_SCHEDULER_NO_TASK; 525 mig_task = GNUNET_SCHEDULER_NO_TASK;
403 if (dsh != NULL) 526 if (GSF_dsh != NULL)
404 { 527 {
405 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX, 528 mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh,
529 0, UINT_MAX,
406 GNUNET_TIME_UNIT_FOREVER_REL, 530 GNUNET_TIME_UNIT_FOREVER_REL,
407 &process_migration_content, NULL); 531 &process_migration_content, NULL);
408 GNUNET_assert (mig_qe != NULL); 532 GNUNET_assert (mig_qe != NULL);
@@ -410,66 +534,108 @@ gather_migration_blocks (void *cls,
410} 534}
411 535
412 536
537/**
538 * A peer connected to us. Start pushing content
539 * to this peer.
540 *
541 * @param peer handle for the peer that connected
542 */
543void
544GSF_push_start_ (struct GSF_ConnectedPeer *peer)
545{
546 struct MigrationReadyPeer *mrp;
413 547
414size_t 548 if (GNUNET_YES != enabled)
415API_ (void *cls, 549 return;
416 size_t size, void *buf) 550 mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
551 mrp->peer = peer;
552 find_content (mrp);
553 GNUNET_CONTAINER_DLL_insert (peer_head,
554 peer_tail,
555 mrp);
556}
557
558
559/**
560 * A peer disconnected from us. Stop pushing content
561 * to this peer.
562 *
563 * @param peer handle for the peer that disconnected
564 */
565void
566GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
417{ 567{
418 next = mig_head; 568 struct MigrationReadyPeer *pos;
419 while (NULL != (mb = next)) 569
570 pos = peer_head;
571 while (pos != NULL)
572 {
573 if (pos->peer == peer)
420 { 574 {
421 next = mb->next; 575 GNUNET_CONTAINER_DLL_remove (peer_head,
422 for (i=0;i<MIGRATION_LIST_SIZE;i++) 576 peer_tail,
423 { 577 pos);
424 if ( (cp->pid == mb->target_list[i]) && 578 if (NULL != pos->th)
425 (mb->size + sizeof (migm) <= size) ) 579 GSF_peer_transmit_cancel_ (pos->th);
426 { 580 GNUNET_free (pos);
427 GNUNET_PEER_change_rc (mb->target_list[i], -1); 581 return;
428 mb->target_list[i] = 0;
429 mb->used_targets++;
430 memset (&migm, 0, sizeof (migm));
431 migm.header.size = htons (sizeof (migm) + mb->size);
432 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
433 migm.type = htonl (mb->type);
434 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
435 memcpy (&cbuf[msize], &migm, sizeof (migm));
436 msize += sizeof (migm);
437 size -= sizeof (migm);
438 memcpy (&cbuf[msize], &mb[1], mb->size);
439 msize += mb->size;
440 size -= mb->size;
441#if DEBUG_FS
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Pushing migration block `%s' (%u bytes) to `%s'\n",
444 GNUNET_h2s (&mb->query),
445 (unsigned int) mb->size,
446 GNUNET_i2s (&pid));
447#endif
448 break;
449 }
450 else
451 {
452#if DEBUG_FS
453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
454 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
455 GNUNET_h2s (&mb->query),
456 (unsigned int) mb->size,
457 GNUNET_i2s (&pid));
458#endif
459 }
460 }
461 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
462 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
463 {
464 delete_migration_block (mb);
465 consider_migration_gathering ();
466 }
467 } 582 }
468 consider_migration (NULL, 583 pos = pos->next;
469 &pid.hashPubKey, 584 }
470 cp); 585}
586
471 587
588/**
589 * Setup the module.
590 *
591 * @param cfg configuration to use
592 */
593void
594GSF_push_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
595{
596 int enabled;
597
598 enabled = GNUNET_CONFIGURATION_get_value_yesno (cfg,
599 "FS",
600 "CONTENT_PUSHING");
601 if (GNUNET_YES != enabled)
602 return;
603
604 if (GNUNET_OK !=
605 GNUNET_CONFIGURATION_get_value_time (cfg,
606 "fs",
607 "MIN_MIGRATION_DELAY",
608 &min_migration_delay))
609 {
610 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
611 _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
612 "MIN_MIGRATION_DELAY",
613 "fs");
614 return;
615 }
616 consider_gathering ();
472} 617}
473 618
474 619
620/**
621 * Shutdown the module.
622 */
623void
624GSF_push_done_ ()
625{
626 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
627 {
628 GNUNET_SCHEDULER_cancel (mig_task);
629 mig_task = GNUNET_SCHEDULER_NO_TASK;
630 }
631 if (NULL != mig_qe)
632 {
633 GNUNET_DATASTORE_cancel (mig_qe);
634 mig_qe = NULL;
635 }
636 while (NULL != mig_head)
637 delete_migration_block (mig_head);
638 GNUNET_assert (0 == mig_size);
639}
475 640
641/* end of gnunet-service-fs_push.c */