summaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_push.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-02-10 12:59:38 +0000
committerChristian Grothoff <christian@grothoff.org>2011-02-10 12:59:38 +0000
commitf54389f6724ecbd39389d53fba7b3bfdb2e0a8eb (patch)
tree11a7156180b22e4eaf784f5b1e400261c00e3ef9 /src/fs/gnunet-service-fs_push.c
parent3a39cd4cd22e345733ba225e7a4c0b6eecdad7df (diff)
downloadgnunet-f54389f6724ecbd39389d53fba7b3bfdb2e0a8eb.tar.gz
gnunet-f54389f6724ecbd39389d53fba7b3bfdb2e0a8eb.zip
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_push.c')
-rw-r--r--src/fs/gnunet-service-fs_push.c475
1 files changed, 475 insertions, 0 deletions
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
new file mode 100644
index 000000000..9f515e2ee
--- /dev/null
+++ b/src/fs/gnunet-service-fs_push.c
@@ -0,0 +1,475 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
26 */
27#include "platform.h"
28#include "gnunet-service-fs_push.h"
29
30
31/* FIXME: below are only old code fragments to use... */
32
33/**
34 * Block that is ready for migration to other peers. Actual data is at the end of the block.
35 */
36struct MigrationReadyBlock
37{
38
39 /**
40 * This is a doubly-linked list.
41 */
42 struct MigrationReadyBlock *next;
43
44 /**
45 * This is a doubly-linked list.
46 */
47 struct MigrationReadyBlock *prev;
48
49 /**
50 * Query for the block.
51 */
52 GNUNET_HashCode query;
53
54 /**
55 * When does this block expire?
56 */
57 struct GNUNET_TIME_Absolute expiration;
58
59 /**
60 * Peers we would consider forwarding this
61 * block to. Zero for empty entries.
62 */
63 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
64
65 /**
66 * Size of the block.
67 */
68 size_t size;
69
70 /**
71 * Number of targets already used.
72 */
73 unsigned int used_targets;
74
75 /**
76 * Type of the block.
77 */
78 enum GNUNET_BLOCK_Type type;
79};
80
81
82/**
83 * Head of linked list of blocks that can be migrated.
84 */
85static struct MigrationReadyBlock *mig_head;
86
87/**
88 * Tail of linked list of blocks that can be migrated.
89 */
90static struct MigrationReadyBlock *mig_tail;
91
92/**
93 * Request to datastore for migration (or NULL).
94 */
95static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
96
97/**
98 * ID of task that collects blocks for migration.
99 */
100static GNUNET_SCHEDULER_TaskIdentifier mig_task;
101
102/**
103 * What is the maximum frequency at which we are allowed to
104 * poll the datastore for migration content?
105 */
106static struct GNUNET_TIME_Relative min_migration_delay;
107
108/**
109 * Are we allowed to push out content from this peer.
110 */
111static int active_from_migration;
112
113/**
114 * Size of the doubly-linked list of migration blocks.
115 */
116static unsigned int mig_size;
117
118
119/**
120 * Delete the given migration block.
121 *
122 * @param mb block to delete
123 */
124static void
125delete_migration_block (struct MigrationReadyBlock *mb)
126{
127 GNUNET_CONTAINER_DLL_remove (mig_head,
128 mig_tail,
129 mb);
130 GNUNET_PEER_decrement_rcs (mb->target_list,
131 MIGRATION_LIST_SIZE);
132 mig_size--;
133 GNUNET_free (mb);
134}
135
136
137/**
138 * Compare the distance of two peers to a key.
139 *
140 * @param key key
141 * @param p1 first peer
142 * @param p2 second peer
143 * @return GNUNET_YES if P1 is closer to key than P2
144 */
145static int
146is_closer (const GNUNET_HashCode *key,
147 const struct GNUNET_PeerIdentity *p1,
148 const struct GNUNET_PeerIdentity *p2)
149{
150 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
151 &p2->hashPubKey,
152 key);
153}
154
155
156/**
157 * Consider migrating content to a given peer.
158 *
159 * @param cls 'struct MigrationReadyBlock*' to select
160 * targets for (or NULL for none)
161 * @param key ID of the peer
162 * @param value 'struct ConnectedPeer' of the peer
163 * @return GNUNET_YES (always continue iteration)
164 */
165static int
166consider_migration (void *cls,
167 const GNUNET_HashCode *key,
168 void *value)
169{
170 struct MigrationReadyBlock *mb = cls;
171 struct ConnectedPeer *cp = value;
172 struct MigrationReadyBlock *pos;
173 struct GNUNET_PeerIdentity cppid;
174 struct GNUNET_PeerIdentity otherpid;
175 struct GNUNET_PeerIdentity worstpid;
176 size_t msize;
177 unsigned int i;
178 unsigned int repl;
179
180 /* consider 'cp' as a migration target for mb */
181 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
182 return GNUNET_YES; /* peer has requested no migration! */
183 if (mb != NULL)
184 {
185 GNUNET_PEER_resolve (cp->pid,
186 &cppid);
187 repl = MIGRATION_LIST_SIZE;
188 for (i=0;i<MIGRATION_LIST_SIZE;i++)
189 {
190 if (mb->target_list[i] == 0)
191 {
192 mb->target_list[i] = cp->pid;
193 GNUNET_PEER_change_rc (mb->target_list[i], 1);
194 repl = MIGRATION_LIST_SIZE;
195 break;
196 }
197 GNUNET_PEER_resolve (mb->target_list[i],
198 &otherpid);
199 if ( (repl == MIGRATION_LIST_SIZE) &&
200 is_closer (&mb->query,
201 &cppid,
202 &otherpid))
203 {
204 repl = i;
205 worstpid = otherpid;
206 }
207 else if ( (repl != MIGRATION_LIST_SIZE) &&
208 (is_closer (&mb->query,
209 &worstpid,
210 &otherpid) ) )
211 {
212 repl = i;
213 worstpid = otherpid;
214 }
215 }
216 if (repl != MIGRATION_LIST_SIZE)
217 {
218 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
219 mb->target_list[repl] = cp->pid;
220 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
221 }
222 }
223
224 /* consider scheduling transmission to cp for content migration */
225 if (cp->cth != NULL)
226 return GNUNET_YES;
227 msize = 0;
228 pos = mig_head;
229 while (pos != NULL)
230 {
231 for (i=0;i<MIGRATION_LIST_SIZE;i++)
232 {
233 if (cp->pid == pos->target_list[i])
234 {
235 if (msize == 0)
236 msize = pos->size;
237 else
238 msize = GNUNET_MIN (msize,
239 pos->size);
240 break;
241 }
242 }
243 pos = pos->next;
244 }
245 if (msize == 0)
246 return GNUNET_YES; /* no content available */
247#if DEBUG_FS
248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249 "Trying to migrate at least %u bytes to peer `%s'\n",
250 msize,
251 GNUNET_h2s (key));
252#endif
253 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
254 {
255 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
256 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
257 }
258 cp->cth
259 = GNUNET_CORE_notify_transmit_ready (core,
260 0, GNUNET_TIME_UNIT_FOREVER_REL,
261 (const struct GNUNET_PeerIdentity*) key,
262 msize + sizeof (struct PutMessage),
263 &transmit_to_peer,
264 cp);
265 return GNUNET_YES;
266}
267
268
269/**
270 * Task that is run periodically to obtain blocks for content
271 * migration
272 *
273 * @param cls unused
274 * @param tc scheduler context (also unused)
275 */
276static void
277gather_migration_blocks (void *cls,
278 const struct GNUNET_SCHEDULER_TaskContext *tc);
279
280
281
282
283/**
284 * If the migration task is not currently running, consider
285 * (re)scheduling it with the appropriate delay.
286 */
287static void
288consider_migration_gathering ()
289{
290 struct GNUNET_TIME_Relative delay;
291
292 if (dsh == NULL)
293 return;
294 if (mig_qe != NULL)
295 return;
296 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
297 return;
298 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
299 mig_size);
300 delay = GNUNET_TIME_relative_divide (delay,
301 MAX_MIGRATION_QUEUE);
302 delay = GNUNET_TIME_relative_max (delay,
303 min_migration_delay);
304 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
305 &gather_migration_blocks,
306 NULL);
307}
308
309
310
311
312/**
313 * Process content offered for migration.
314 *
315 * @param cls closure
316 * @param key key for the content
317 * @param size number of bytes in data
318 * @param data content stored
319 * @param type type of the content
320 * @param priority priority of the content
321 * @param anonymity anonymity-level for the content
322 * @param expiration expiration time for the content
323 * @param uid unique identifier for the datum;
324 * maybe 0 if no unique identifier is available
325 */
326static void
327process_migration_content (void *cls,
328 const GNUNET_HashCode * key,
329 size_t size,
330 const void *data,
331 enum GNUNET_BLOCK_Type type,
332 uint32_t priority,
333 uint32_t anonymity,
334 struct GNUNET_TIME_Absolute
335 expiration, uint64_t uid)
336{
337 struct MigrationReadyBlock *mb;
338
339 if (key == NULL)
340 {
341 mig_qe = NULL;
342 if (mig_size < MAX_MIGRATION_QUEUE)
343 consider_migration_gathering ();
344 return;
345 }
346 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
347 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
348 {
349 /* content will expire soon, don't bother */
350 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
351 return;
352 }
353 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
354 {
355 if (GNUNET_OK !=
356 GNUNET_FS_handle_on_demand_block (key, size, data,
357 type, priority, anonymity,
358 expiration, uid,
359 &process_migration_content,
360 NULL))
361 {
362 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
363 }
364 return;
365 }
366#if DEBUG_FS
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368 "Retrieved block `%s' of type %u for migration\n",
369 GNUNET_h2s (key),
370 type);
371#endif
372 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
373 mb->query = *key;
374 mb->expiration = expiration;
375 mb->size = size;
376 mb->type = type;
377 memcpy (&mb[1], data, size);
378 GNUNET_CONTAINER_DLL_insert_after (mig_head,
379 mig_tail,
380 mig_tail,
381 mb);
382 mig_size++;
383 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
384 &consider_migration,
385 mb);
386 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
387}
388
389
390
391/**
392 * Task that is run periodically to obtain blocks for content
393 * migration
394 *
395 * @param cls unused
396 * @param tc scheduler context (also unused)
397 */
398static void
399gather_migration_blocks (void *cls,
400 const struct GNUNET_SCHEDULER_TaskContext *tc)
401{
402 mig_task = GNUNET_SCHEDULER_NO_TASK;
403 if (dsh != NULL)
404 {
405 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
406 GNUNET_TIME_UNIT_FOREVER_REL,
407 &process_migration_content, NULL);
408 GNUNET_assert (mig_qe != NULL);
409 }
410}
411
412
413
414size_t
415API_ (void *cls,
416 size_t size, void *buf)
417{
418 next = mig_head;
419 while (NULL != (mb = next))
420 {
421 next = mb->next;
422 for (i=0;i<MIGRATION_LIST_SIZE;i++)
423 {
424 if ( (cp->pid == mb->target_list[i]) &&
425 (mb->size + sizeof (migm) <= size) )
426 {
427 GNUNET_PEER_change_rc (mb->target_list[i], -1);
428 mb->target_list[i] = 0;
429 mb->used_targets++;
430 memset (&migm, 0, sizeof (migm));
431 migm.header.size = htons (sizeof (migm) + mb->size);
432 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
433 migm.type = htonl (mb->type);
434 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
435 memcpy (&cbuf[msize], &migm, sizeof (migm));
436 msize += sizeof (migm);
437 size -= sizeof (migm);
438 memcpy (&cbuf[msize], &mb[1], mb->size);
439 msize += mb->size;
440 size -= mb->size;
441#if DEBUG_FS
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Pushing migration block `%s' (%u bytes) to `%s'\n",
444 GNUNET_h2s (&mb->query),
445 (unsigned int) mb->size,
446 GNUNET_i2s (&pid));
447#endif
448 break;
449 }
450 else
451 {
452#if DEBUG_FS
453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
454 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
455 GNUNET_h2s (&mb->query),
456 (unsigned int) mb->size,
457 GNUNET_i2s (&pid));
458#endif
459 }
460 }
461 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
462 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
463 {
464 delete_migration_block (mb);
465 consider_migration_gathering ();
466 }
467 }
468 consider_migration (NULL,
469 &pid.hashPubKey,
470 cp);
471
472}
473
474
475