aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/defaults.conf2
-rw-r--r--src/fs/fs.h6
-rw-r--r--src/fs/gnunet-service-fs_cp.c77
-rw-r--r--src/fs/gnunet-service-fs_cp.h5
-rw-r--r--src/fs/gnunet-service-fs_push.c530
-rw-r--r--src/fs/gnunet-service-fs_push.h68
-rw-r--r--src/fs/gnunet-service-fs_put.c41
-rw-r--r--src/fs/gnunet-service-fs_put.h48
8 files changed, 565 insertions, 212 deletions
diff --git a/contrib/defaults.conf b/contrib/defaults.conf
index 3848d4af6..f0ee4e3e0 100644
--- a/contrib/defaults.conf
+++ b/contrib/defaults.conf
@@ -311,7 +311,7 @@ MAX_PENDING_REQUESTS = 65536
311# Maximum frequency we're allowed to poll the datastore 311# Maximum frequency we're allowed to poll the datastore
312# for content for migration (can be used to reduce 312# for content for migration (can be used to reduce
313# GNUnet's disk-IO rate) 313# GNUnet's disk-IO rate)
314MIN_MIGRATION_DELAY = 1000 314MIN_MIGRATION_DELAY = 100
315EXPECTED_NEIGHBOUR_COUNT = 128 315EXPECTED_NEIGHBOUR_COUNT = 128
316 316
317[dht] 317[dht]
diff --git a/src/fs/fs.h b/src/fs/fs.h
index a5b15dcef..ff769cfca 100644
--- a/src/fs/fs.h
+++ b/src/fs/fs.h
@@ -65,10 +65,10 @@
65#define MAX_MIGRATION_QUEUE 32 65#define MAX_MIGRATION_QUEUE 32
66 66
67/** 67/**
68 * How many peers do we select as possible 68 * Blocks are at most migrated to this number of peers
69 * targets per block obtained for migration? 69 * plus one, each time they are fetched from the database.
70 */ 70 */
71#define MIGRATION_LIST_SIZE 4 71#define MIGRATION_LIST_SIZE 2
72 72
73/** 73/**
74 * To how many peers do we forward each migration block ultimately? 74 * To how many peers do we forward each migration block ultimately?
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 3ce03be2e..be041c861 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -26,6 +26,7 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet-service-fs.h" 27#include "gnunet-service-fs.h"
28#include "gnunet-service-fs_cp.h" 28#include "gnunet-service-fs_cp.h"
29#include "gnunet-service-fs_push.h"
29 30
30/** 31/**
31 * How often do we flush trust values to disk? 32 * How often do we flush trust values to disk?
@@ -116,6 +117,11 @@ struct GSF_ConnectedPeer
116 struct GNUNET_TIME_Absolute last_migration_block; 117 struct GNUNET_TIME_Absolute last_migration_block;
117 118
118 /** 119 /**
120 * Task scheduled to revive migration to this peer.
121 */
122 struct GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
123
124 /**
119 * Messages (replies, queries, content migration) we would like to 125 * Messages (replies, queries, content migration) we would like to
120 * send to this peer in the near future. Sorted by priority, head. 126 * send to this peer in the near future. Sorted by priority, head.
121 */ 127 */
@@ -144,11 +150,6 @@ struct GSF_ConnectedPeer
144 struct GNUNET_CONTAINER_MulitHashMap *request_map; 150 struct GNUNET_CONTAINER_MulitHashMap *request_map;
145 151
146 /** 152 /**
147 * ID of delay task for scheduling transmission.
148 */
149 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!)
150
151 /**
152 * Increase in traffic preference still to be submitted 153 * Increase in traffic preference still to be submitted
153 * to the core service for this peer. 154 * to the core service for this peer.
154 */ 155 */
@@ -160,11 +161,6 @@ struct GSF_ConnectedPeer
160 uint32_t disk_trust; 161 uint32_t disk_trust;
161 162
162 /** 163 /**
163 * The peer's identity.
164 */
165 GNUNET_PEER_Id pid;
166
167 /**
168 * Which offset in "last_p2p_replies" will be updated next? 164 * Which offset in "last_p2p_replies" will be updated next?
169 * (we go round-robin). 165 * (we go round-robin).
170 */ 166 */
@@ -388,7 +384,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
388 384
389 cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer)); 385 cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
390 cp->transmission_delay = GNUNET_LOAD_value_init (latency); 386 cp->transmission_delay = GNUNET_LOAD_value_init (latency);
391 cp->pid = GNUNET_PEER_intern (peer); 387 cp->ppd.pid = GNUNET_PEER_intern (peer);
392 cp->transmission_delay = GNUNET_LOAD_value_init (0); 388 cp->transmission_delay = GNUNET_LOAD_value_init (0);
393 cp->irc = GNUNET_CORE_peer_change_preference (core, 389 cp->irc = GNUNET_CORE_peer_change_preference (core,
394 peer, 390 peer,
@@ -411,11 +407,41 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
411 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 407 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
412 update_atsi (cp, atsi); 408 update_atsi (cp, atsi);
413 GSF_plan_notify_new_peer_ (cp); 409 GSF_plan_notify_new_peer_ (cp);
410 GSF_push_start_ (cp);
414 return cp; 411 return cp;
415} 412}
416 413
417 414
418/** 415/**
416 * It may be time to re-start migrating content to this
417 * peer. Check, and if so, restart migration.
418 *
419 * @param cls the 'struct GSF_ConnectedPeer'
420 * @param tc scheduler context
421 */
422static void
423revive_migration (void *cls,
424 const struct GNUNET_SCHEDULER_TaskContext *tc)
425{
426 struct GSF_ConnectedPeer *cp = cls;
427 struct GNUNET_TIME_Relative bt;
428
429 cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
430 bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
431 if (0 != bt.rel_value)
432 {
433 /* still time left... */
434 cp->mig_revive_task
435 = GNUNET_SCHEDULER_add_delayed (bt,
436 &revive_migration,
437 cp);
438 return;
439 }
440 GSF_push_start_ (cp);
441}
442
443
444/**
419 * Handle P2P "MIGRATION_STOP" message. 445 * Handle P2P "MIGRATION_STOP" message.
420 * 446 *
421 * @param cls closure, always NULL 447 * @param cls closure, always NULL
@@ -434,6 +460,7 @@ GSF_handle_p2p_migration_stop_ (void *cls,
434{ 460{
435 struct GSF_ConnectedPeer *cp; 461 struct GSF_ConnectedPeer *cp;
436 const struct MigrationStopMessage *msm; 462 const struct MigrationStopMessage *msm;
463 struct GNUNET_TIME_Relative bt;
437 464
438 msm = (const struct MigrationStopMessage*) message; 465 msm = (const struct MigrationStopMessage*) message;
439 cp = GNUNET_CONTAINER_multihashmap_get (cp_map, 466 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
@@ -443,7 +470,16 @@ GSF_handle_p2p_migration_stop_ (void *cls,
443 GNUNET_break (0); 470 GNUNET_break (0);
444 return GNUNET_OK; 471 return GNUNET_OK;
445 } 472 }
446 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); 473 bt = GNUNET_TIME_relative_ntoh (msm->duration);
474 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
475 if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK)
476 {
477 GSF_push_stop_ (cp);
478 cp->mig_revive_task
479 = GNUNET_SCHEDULER_add_delayed (bt,
480 &revive_migration,
481 cp);
482 }
447 update_atsi (cp, atsi); 483 update_atsi (cp, atsi);
448 return GNUNET_OK; 484 return GNUNET_OK;
449} 485}
@@ -880,7 +916,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
880 cp->pth_tail, 916 cp->pth_tail,
881 prev, 917 prev,
882 pth); 918 pth);
883 GNUNET_PEER_resolve (cp->pid, 919 GNUNET_PEER_resolve (cp->ppd.pid,
884 &target); 920 &target);
885 if (GNUNET_YES == is_query) 921 if (GNUNET_YES == is_query)
886 { 922 {
@@ -1022,8 +1058,8 @@ GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1022 const struct GSF_ConnectedPeer *initiator_peer) 1058 const struct GSF_ConnectedPeer *initiator_peer)
1023{ 1059{
1024 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1); 1060 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
1025 cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid; 1061 cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->ppd.pid;
1026 GNUNET_PEER_change_rc (initiator_peer->pid, 1); 1062 GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1027} 1063}
1028 1064
1029 1065
@@ -1125,7 +1161,12 @@ GSF_peer_disconnect_handler_ (void *cls,
1125 pth); 1161 pth);
1126 GNUNET_free (pth); 1162 GNUNET_free (pth);
1127 } 1163 }
1128 GNUNET_PEER_change_rc (cp->pid, -1); 1164 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1165 if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
1166 {
1167 GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1168 cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
1169 }
1129 GNUNET_free (cp); 1170 GNUNET_free (cp);
1130} 1171}
1131 1172
@@ -1201,7 +1242,7 @@ void
1201GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp, 1242GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1202 struct GNUNET_PeerIdentity *id) 1243 struct GNUNET_PeerIdentity *id)
1203{ 1244{
1204 GNUNET_PEER_resolve (cp->pid, 1245 GNUNET_PEER_resolve (cp->ppd.pid,
1205 &id); 1246 &id);
1206} 1247}
1207 1248
@@ -1281,7 +1322,7 @@ flush_trust (void *cls,
1281 1322
1282 if (cp->trust == cp->disk_trust) 1323 if (cp->trust == cp->disk_trust)
1283 return GNUNET_OK; /* unchanged */ 1324 return GNUNET_OK; /* unchanged */
1284 GNUNET_PEER_resolve (cp->pid, 1325 GNUNET_PEER_resolve (cp->ppd.pid,
1285 &pid); 1326 &pid);
1286 fn = get_trust_filename (&pid); 1327 fn = get_trust_filename (&pid);
1287 if (cp->trust == 0) 1328 if (cp->trust == 0)
diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h
index 081a1d5ba..48a019712 100644
--- a/src/fs/gnunet-service-fs_cp.h
+++ b/src/fs/gnunet-service-fs_cp.h
@@ -90,6 +90,11 @@ struct GSF_PeerPerformanceData
90 double avg_priority; 90 double avg_priority;
91 91
92 /** 92 /**
93 * The peer's identity.
94 */
95 GNUNET_PEER_Id pid;
96
97 /**
93 * Trust rating for this peer 98 * Trust rating for this peer
94 */ 99 */
95 uint32_t trust; 100 uint32_t trust;
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
index 9f515e2ee..2180a520d 100644
--- a/src/fs/gnunet-service-fs_push.c
+++ b/src/fs/gnunet-service-fs_push.c
@@ -27,9 +27,6 @@
27#include "platform.h" 27#include "platform.h"
28#include "gnunet-service-fs_push.h" 28#include "gnunet-service-fs_push.h"
29 29
30
31/* FIXME: below are only old code fragments to use... */
32
33/** 30/**
34 * Block that is ready for migration to other peers. Actual data is at the end of the block. 31 * Block that is ready for migration to other peers. Actual data is at the end of the block.
35 */ 32 */
@@ -57,7 +54,7 @@ struct MigrationReadyBlock
57 struct GNUNET_TIME_Absolute expiration; 54 struct GNUNET_TIME_Absolute expiration;
58 55
59 /** 56 /**
60 * Peers we would consider forwarding this 57 * Peers we already forwarded this
61 * block to. Zero for empty entries. 58 * block to. Zero for empty entries.
62 */ 59 */
63 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; 60 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
@@ -80,6 +77,40 @@ struct MigrationReadyBlock
80 77
81 78
82/** 79/**
80 * Information about a peer waiting for
81 * migratable data.
82 */
83struct MigrationReadyPeer
84{
85 /**
86 * This is a doubly-linked list.
87 */
88 struct MigrationReadyPeer *next;
89
90 /**
91 * This is a doubly-linked list.
92 */
93 struct MigrationReadyPeer *prev;
94
95 /**
96 * Handle to peer.
97 */
98 struct GSF_ConnectedPeer *peer;
99
100 /**
101 * Handle for current transmission request,
102 * or NULL for none.
103 */
104 struct GSF_PeerTransmitHandle *th;
105
106 /**
107 * Message we are trying to push right now (or NULL)
108 */
109 struct PutMessage *msg;
110};
111
112
113/**
83 * Head of linked list of blocks that can be migrated. 114 * Head of linked list of blocks that can be migrated.
84 */ 115 */
85static struct MigrationReadyBlock *mig_head; 116static struct MigrationReadyBlock *mig_head;
@@ -90,6 +121,16 @@ static struct MigrationReadyBlock *mig_head;
90static struct MigrationReadyBlock *mig_tail; 121static struct MigrationReadyBlock *mig_tail;
91 122
92/** 123/**
124 * Head of linked list of peers.
125 */
126static struct MigrationReadyPeer *peer_head;
127
128/**
129 * Tail of linked list of peers.
130 */
131static struct MigrationReadyPeer *peer_tail;
132
133/**
93 * Request to datastore for migration (or NULL). 134 * Request to datastore for migration (or NULL).
94 */ 135 */
95static struct GNUNET_DATASTORE_QueueEntry *mig_qe; 136static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
@@ -106,14 +147,14 @@ static GNUNET_SCHEDULER_TaskIdentifier mig_task;
106static struct GNUNET_TIME_Relative min_migration_delay; 147static struct GNUNET_TIME_Relative min_migration_delay;
107 148
108/** 149/**
109 * Are we allowed to push out content from this peer. 150 * Size of the doubly-linked list of migration blocks.
110 */ 151 */
111static int active_from_migration; 152static unsigned int mig_size;
112 153
113/** 154/**
114 * Size of the doubly-linked list of migration blocks. 155 * Is this module enabled?
115 */ 156 */
116static unsigned int mig_size; 157static int enabled;
117 158
118 159
119/** 160/**
@@ -135,134 +176,212 @@ delete_migration_block (struct MigrationReadyBlock *mb)
135 176
136 177
137/** 178/**
138 * Compare the distance of two peers to a key. 179 * Find content for migration to this peer.
180 */
181static void
182find_content (struct MigrationReadyPeer *mrp);
183
184
185/**
186 * Transmit the message currently scheduled for
187 * transmission.
139 * 188 *
140 * @param key key 189 * @param cls the 'struct MigrationReadyPeer'
141 * @param p1 first peer 190 * @param buf_size number of bytes available in buf
142 * @param p2 second peer 191 * @param buf where to copy the message, NULL on error (peer disconnect)
143 * @return GNUNET_YES if P1 is closer to key than P2 192 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
144 */ 193 */
145static int 194static size_t
146is_closer (const GNUNET_HashCode *key, 195transmit_message (void *cls,
147 const struct GNUNET_PeerIdentity *p1, 196 size_t buf_size,
148 const struct GNUNET_PeerIdentity *p2) 197 void *buf)
149{ 198{
150 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, 199 struct MigrationReadyPeer *peer = cls;
151 &p2->hashPubKey, 200 struct PutMessage *msg;
152 key); 201 uint16_t msize;
202
203 peer->th = NULL;
204 msg = peer->msg;
205 peer->msg = NULL;
206 if (buf == NULL)
207 {
208 GNUNET_free (msg);
209 return 0;
210 }
211 msize = ntohs (msg->header.size);
212 GNUNET_assert (msize <= buf_size);
213 memcpy (buf, msg, msize);
214 GNUNET_free (msg);
215 find_content (peer);
216 return msize;
153} 217}
154 218
155 219
156/** 220/**
157 * Consider migrating content to a given peer. 221 * Send the given block to the given peer.
158 * 222 *
159 * @param cls 'struct MigrationReadyBlock*' to select 223 * @param peer target peer
160 * targets for (or NULL for none) 224 * @param block the block
161 * @param key ID of the peer 225 * @return GNUNET_YES if the block was deleted (!)
162 * @param value 'struct ConnectedPeer' of the peer
163 * @return GNUNET_YES (always continue iteration)
164 */ 226 */
165static int 227static int
166consider_migration (void *cls, 228transmit_content (struct MigrationReadyPeer *peer,
167 const GNUNET_HashCode *key, 229 struct MigrationReadyBlock *block)
168 void *value)
169{ 230{
170 struct MigrationReadyBlock *mb = cls;
171 struct ConnectedPeer *cp = value;
172 struct MigrationReadyBlock *pos;
173 struct GNUNET_PeerIdentity cppid;
174 struct GNUNET_PeerIdentity otherpid;
175 struct GNUNET_PeerIdentity worstpid;
176 size_t msize; 231 size_t msize;
177 unsigned int i; 232 struct PutMessage *msg;
178 unsigned int repl; 233 unsigned int i;
234 struct GSF_PeerPerformanceData *ppd;
235 int ret;
236
237 ppd = GSF_get_peer_performance_data (peer->peer);
238 GNUNET_assert (NULL == peer->th);
239 msize = sizeof (struct PutMessage) + block->size;
240 msg = GNUNET_malloc (msize);
241 msg->header.type = htons (42);
242 msg->header.size = htons (msize);
179 243
180 /* consider 'cp' as a migration target for mb */ 244 memcpy (&msg[1],
181 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0) 245 &block[1],
182 return GNUNET_YES; /* peer has requested no migration! */ 246 block->size);
183 if (mb != NULL) 247 peer->msg = msg;
248 for (i=0;i<MIGRATION_LIST_SIZE;i++)
184 { 249 {
185 GNUNET_PEER_resolve (cp->pid, 250 if (block->target_list[i] == 0)
186 &cppid);
187 repl = MIGRATION_LIST_SIZE;
188 for (i=0;i<MIGRATION_LIST_SIZE;i++)
189 { 251 {
190 if (mb->target_list[i] == 0) 252 block->target_list[i] = ppd->pid;
191 { 253 GNUNET_PEER_change_rc (block->target_list[i], 1);
192 mb->target_list[i] = cp->pid; 254 break;
193 GNUNET_PEER_change_rc (mb->target_list[i], 1);
194 repl = MIGRATION_LIST_SIZE;
195 break;
196 }
197 GNUNET_PEER_resolve (mb->target_list[i],
198 &otherpid);
199 if ( (repl == MIGRATION_LIST_SIZE) &&
200 is_closer (&mb->query,
201 &cppid,
202 &otherpid))
203 {
204 repl = i;
205 worstpid = otherpid;
206 }
207 else if ( (repl != MIGRATION_LIST_SIZE) &&
208 (is_closer (&mb->query,
209 &worstpid,
210 &otherpid) ) )
211 {
212 repl = i;
213 worstpid = otherpid;
214 }
215 }
216 if (repl != MIGRATION_LIST_SIZE)
217 {
218 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
219 mb->target_list[repl] = cp->pid;
220 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
221 } 255 }
222 } 256 }
257 if (MIGRATION_LIST_SIZE == i)
258 {
259 delete_migration_block (block);
260 ret = GNUNET_YES;
261 }
262 else
263 {
264 ret = GNUNET_NO;
265 }
266 peer->th = GSF_peer_transmit_ (peer->peer,
267 GNUNET_NO,
268 0 /* priority */,
269 GNUNET_TIME_UNIT_FOREVER_REL,
270 msize,
271 &transmit_message,
272 peer);
273 return ret;
274}
223 275
224 /* consider scheduling transmission to cp for content migration */ 276
225 if (cp->cth != NULL) 277/**
226 return GNUNET_YES; 278 * Count the number of peers this block has
227 msize = 0; 279 * already been forwarded to.
228 pos = mig_head; 280 *
229 while (pos != NULL) 281 * @param block the block
282 * @return number of times block was forwarded
283 */
284static unsigned int
285count_targets (struct MigrationReadyBlock *block)
286{
287 unsigned int i;
288
289 for (i=0;i<MIGRATION_LIST_SIZE;i++)
290 if (block->target_list[i] == 0)
291 return i;
292 return i;
293}
294
295
296/**
297 * Check if sending this block to this peer would
298 * be a good idea.
299 *
300 * @param peer target peer
301 * @param block the block
302 * @return score (>= 0: feasible, negative: infeasible)
303 */
304static long
305score_content (struct MigrationReadyPeer *peer,
306 struct MigrationReadyBlock *block)
307{
308 unsigned int i;
309 struct GSF_PeerPerformanceData *ppd;
310 struct GNUNET_PeerIdentity id;
311 uint32_t dist;
312
313 ppd = GSF_get_peer_performance_data (peer->peer);
314 for (i=0;i<MIGRATION_LIST_SIZE;i++)
315 if (mb->target_list[i] == ppd->pid)
316 return -1;
317 GSF_connected_peer_get_identity (peer->peer,
318 &id);
319 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
320 &id.hashPubKey);
321 /* closer distance, higher score: */
322 return UINT32_MAX - dist;
323}
324
325
326/**
327 * If the migration task is not currently running, consider
328 * (re)scheduling it with the appropriate delay.
329 */
330static void
331consider_gathering (void);
332
333
334/**
335 * Find content for migration to this peer.
336 *
337 * @param mig peer to find content for
338 */
339static void
340find_content (struct MigrationReadyPeer *mrp)
341{
342 struct MigrationReadyBlock *pos;
343 long score;
344 long best_score;
345 struct MigrationReadyBlock *best;
346
347 GNUNET_assert (NULL == mrp->th);
348 best = NULL;
349 best_score = -1;
350 pos = mig_qe;
351 while (NULL != pos)
230 { 352 {
231 for (i=0;i<MIGRATION_LIST_SIZE;i++) 353 score = score_content (mrp, pos);
354 if (score > best_score)
232 { 355 {
233 if (cp->pid == pos->target_list[i]) 356 best_score = score;
234 { 357 best = mrp;
235 if (msize == 0)
236 msize = pos->size;
237 else
238 msize = GNUNET_MIN (msize,
239 pos->size);
240 break;
241 }
242 } 358 }
243 pos = pos->next; 359 pos = pos->next;
244 } 360 }
245 if (msize == 0) 361 if ( (NULL == best) &&
246 return GNUNET_YES; /* no content available */ 362 (mig_size >= MAX_MIGRATION_QUEUE) )
247#if DEBUG_FS
248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249 "Trying to migrate at least %u bytes to peer `%s'\n",
250 msize,
251 GNUNET_h2s (key));
252#endif
253 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
254 { 363 {
255 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); 364 /* failed to find migration target AND
256 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; 365 queue is full, purge most-forwarded
366 block from queue to make room for more */
367 score = 0;
368 pos = mig_qe;
369 while (NULL != pos)
370 {
371 score = count_targets (pos);
372 if (score >= best_score)
373 {
374 best_score = score;
375 best = mrp;
376 }
377 pos = pos->next;
378 }
379 GNUNET_assert (NULL != best);
380 delete_migration_block (best);
381 consider_gathering ();
382 return;
257 } 383 }
258 cp->cth 384 transmit_content (peer, best);
259 = GNUNET_CORE_notify_transmit_ready (core,
260 0, GNUNET_TIME_UNIT_FOREVER_REL,
261 (const struct GNUNET_PeerIdentity*) key,
262 msize + sizeof (struct PutMessage),
263 &transmit_to_peer,
264 cp);
265 return GNUNET_YES;
266} 385}
267 386
268 387
@@ -278,23 +397,23 @@ gather_migration_blocks (void *cls,
278 const struct GNUNET_SCHEDULER_TaskContext *tc); 397 const struct GNUNET_SCHEDULER_TaskContext *tc);
279 398
280 399
281
282
283/** 400/**
284 * If the migration task is not currently running, consider 401 * If the migration task is not currently running, consider
285 * (re)scheduling it with the appropriate delay. 402 * (re)scheduling it with the appropriate delay.
286 */ 403 */
287static void 404static void
288consider_migration_gathering () 405consider_gathering ()
289{ 406{
290 struct GNUNET_TIME_Relative delay; 407 struct GNUNET_TIME_Relative delay;
291 408
292 if (dsh == NULL) 409 if (GSF_dsh == NULL)
293 return; 410 return;
294 if (mig_qe != NULL) 411 if (mig_qe != NULL)
295 return; 412 return;
296 if (mig_task != GNUNET_SCHEDULER_NO_TASK) 413 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
297 return; 414 return;
415 if (mig_size >= MAX_MIGRATION_QUEUE)
416 return;
298 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 417 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
299 mig_size); 418 mig_size);
300 delay = GNUNET_TIME_relative_divide (delay, 419 delay = GNUNET_TIME_relative_divide (delay,
@@ -307,8 +426,6 @@ consider_migration_gathering ()
307} 426}
308 427
309 428
310
311
312/** 429/**
313 * Process content offered for migration. 430 * Process content offered for migration.
314 * 431 *
@@ -335,19 +452,19 @@ process_migration_content (void *cls,
335 expiration, uint64_t uid) 452 expiration, uint64_t uid)
336{ 453{
337 struct MigrationReadyBlock *mb; 454 struct MigrationReadyBlock *mb;
455 struct MigrationReadyPeer *pos;
338 456
339 if (key == NULL) 457 if (key == NULL)
340 { 458 {
341 mig_qe = NULL; 459 mig_qe = NULL;
342 if (mig_size < MAX_MIGRATION_QUEUE) 460 consider_gathering ();
343 consider_migration_gathering ();
344 return; 461 return;
345 } 462 }
346 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 463 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
347 MIN_MIGRATION_CONTENT_LIFETIME.rel_value) 464 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
348 { 465 {
349 /* content will expire soon, don't bother */ 466 /* content will expire soon, don't bother */
350 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 467 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
351 return; 468 return;
352 } 469 }
353 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 470 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
@@ -359,7 +476,7 @@ process_migration_content (void *cls,
359 &process_migration_content, 476 &process_migration_content,
360 NULL)) 477 NULL))
361 { 478 {
362 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 479 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
363 } 480 }
364 return; 481 return;
365 } 482 }
@@ -380,14 +497,20 @@ process_migration_content (void *cls,
380 mig_tail, 497 mig_tail,
381 mb); 498 mb);
382 mig_size++; 499 mig_size++;
383 GNUNET_CONTAINER_multihashmap_iterate (connected_peers, 500 pos = peer_head;
384 &consider_migration, 501 while (pos != NULL)
385 mb); 502 {
386 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 503 if (NULL == pos->th)
504 {
505 if (GNUNET_YES == transmit_content (pos, mb))
506 break; /* 'mb' was freed! */
507 }
508 pos = pos->next;
509 }
510 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
387} 511}
388 512
389 513
390
391/** 514/**
392 * Task that is run periodically to obtain blocks for content 515 * Task that is run periodically to obtain blocks for content
393 * migration 516 * migration
@@ -400,9 +523,10 @@ gather_migration_blocks (void *cls,
400 const struct GNUNET_SCHEDULER_TaskContext *tc) 523 const struct GNUNET_SCHEDULER_TaskContext *tc)
401{ 524{
402 mig_task = GNUNET_SCHEDULER_NO_TASK; 525 mig_task = GNUNET_SCHEDULER_NO_TASK;
403 if (dsh != NULL) 526 if (GSF_dsh != NULL)
404 { 527 {
405 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX, 528 mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh,
529 0, UINT_MAX,
406 GNUNET_TIME_UNIT_FOREVER_REL, 530 GNUNET_TIME_UNIT_FOREVER_REL,
407 &process_migration_content, NULL); 531 &process_migration_content, NULL);
408 GNUNET_assert (mig_qe != NULL); 532 GNUNET_assert (mig_qe != NULL);
@@ -410,66 +534,108 @@ gather_migration_blocks (void *cls,
410} 534}
411 535
412 536
537/**
538 * A peer connected to us. Start pushing content
539 * to this peer.
540 *
541 * @param peer handle for the peer that connected
542 */
543void
544GSF_push_start_ (struct GSF_ConnectedPeer *peer)
545{
546 struct MigrationReadyPeer *mrp;
413 547
414size_t 548 if (GNUNET_YES != enabled)
415API_ (void *cls, 549 return;
416 size_t size, void *buf) 550 mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer));
551 mrp->peer = peer;
552 find_content (mrp);
553 GNUNET_CONTAINER_DLL_insert (peer_head,
554 peer_tail,
555 mrp);
556}
557
558
559/**
560 * A peer disconnected from us. Stop pushing content
561 * to this peer.
562 *
563 * @param peer handle for the peer that disconnected
564 */
565void
566GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
417{ 567{
418 next = mig_head; 568 struct MigrationReadyPeer *pos;
419 while (NULL != (mb = next)) 569
570 pos = peer_head;
571 while (pos != NULL)
572 {
573 if (pos->peer == peer)
420 { 574 {
421 next = mb->next; 575 GNUNET_CONTAINER_DLL_remove (peer_head,
422 for (i=0;i<MIGRATION_LIST_SIZE;i++) 576 peer_tail,
423 { 577 pos);
424 if ( (cp->pid == mb->target_list[i]) && 578 if (NULL != pos->th)
425 (mb->size + sizeof (migm) <= size) ) 579 GSF_peer_transmit_cancel_ (pos->th);
426 { 580 GNUNET_free (pos);
427 GNUNET_PEER_change_rc (mb->target_list[i], -1); 581 return;
428 mb->target_list[i] = 0;
429 mb->used_targets++;
430 memset (&migm, 0, sizeof (migm));
431 migm.header.size = htons (sizeof (migm) + mb->size);
432 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
433 migm.type = htonl (mb->type);
434 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
435 memcpy (&cbuf[msize], &migm, sizeof (migm));
436 msize += sizeof (migm);
437 size -= sizeof (migm);
438 memcpy (&cbuf[msize], &mb[1], mb->size);
439 msize += mb->size;
440 size -= mb->size;
441#if DEBUG_FS
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Pushing migration block `%s' (%u bytes) to `%s'\n",
444 GNUNET_h2s (&mb->query),
445 (unsigned int) mb->size,
446 GNUNET_i2s (&pid));
447#endif
448 break;
449 }
450 else
451 {
452#if DEBUG_FS
453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
454 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
455 GNUNET_h2s (&mb->query),
456 (unsigned int) mb->size,
457 GNUNET_i2s (&pid));
458#endif
459 }
460 }
461 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
462 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
463 {
464 delete_migration_block (mb);
465 consider_migration_gathering ();
466 }
467 } 582 }
468 consider_migration (NULL, 583 pos = pos->next;
469 &pid.hashPubKey, 584 }
470 cp); 585}
586
471 587
588/**
589 * Setup the module.
590 *
591 * @param cfg configuration to use
592 */
593void
594GSF_push_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
595{
596 int enabled;
597
598 enabled = GNUNET_CONFIGURATION_get_value_yesno (cfg,
599 "FS",
600 "CONTENT_PUSHING");
601 if (GNUNET_YES != enabled)
602 return;
603
604 if (GNUNET_OK !=
605 GNUNET_CONFIGURATION_get_value_time (cfg,
606 "fs",
607 "MIN_MIGRATION_DELAY",
608 &min_migration_delay))
609 {
610 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
611 _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"),
612 "MIN_MIGRATION_DELAY",
613 "fs");
614 return;
615 }
616 consider_gathering ();
472} 617}
473 618
474 619
620/**
621 * Shutdown the module.
622 */
623void
624GSF_push_done_ ()
625{
626 if (GNUNET_SCHEDULER_NO_TASK != mig_task)
627 {
628 GNUNET_SCHEDULER_cancel (mig_task);
629 mig_task = GNUNET_SCHEDULER_NO_TASK;
630 }
631 if (NULL != mig_qe)
632 {
633 GNUNET_DATASTORE_cancel (mig_qe);
634 mig_qe = NULL;
635 }
636 while (NULL != mig_head)
637 delete_migration_block (mig_head);
638 GNUNET_assert (0 == mig_size);
639}
475 640
641/* end of gnunet-service-fs_push.c */
diff --git a/src/fs/gnunet-service-fs_push.h b/src/fs/gnunet-service-fs_push.h
new file mode 100644
index 000000000..5f7a0030c
--- /dev/null
+++ b/src/fs/gnunet-service-fs_push.h
@@ -0,0 +1,68 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_push.h
23 * @brief support for pushing out content
24 * @author Christian Grothoff
25 */
26#ifndef GNUNET_SERVICE_FS_PUSH_H
27#define GNUNET_SERVICE_FS_PUSH_H
28
29#include "gnunet-service-fs.h"
30
31
32/**
33 * Setup the module.
34 *
35 * @param cfg configuration to use
36 */
37void
38GSF_push_init_ (struct GNUNET_CONFIGURATION_Handle *cfg);
39
40
41/**
42 * Shutdown the module.
43 */
44void
45GSF_push_done_ (void);
46
47
48/**
49 * A peer connected to us or we are now again allowed to push content.
50 * Start pushing content to this peer.
51 *
52 * @param peer handle for the peer that connected
53 */
54void
55GSF_push_start_ (struct GSF_ConnectedPeer *peer);
56
57
58/**
59 * A peer disconnected from us or asked us to stop pushing content for
60 * a while. Stop pushing content to this peer.
61 *
62 * @param peer handle for the peer that disconnected
63 */
64void
65GSF_push_stop_ (struct GSF_ConnectedPeer *peer);
66
67
68#endif
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
index eb7289f1e..862a795ba 100644
--- a/src/fs/gnunet-service-fs_put.c
+++ b/src/fs/gnunet-service-fs_put.c
@@ -26,15 +26,12 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet-service-fs_put.h" 27#include "gnunet-service-fs_put.h"
28 28
29/* FIXME: below are only old code fragments to use... */
30
31 29
32/** 30/**
33 * Request to datastore for DHT PUTs (or NULL). 31 * Request to datastore for DHT PUTs (or NULL).
34 */ 32 */
35static struct GNUNET_DATASTORE_QueueEntry *dht_qe; 33static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
36 34
37
38/** 35/**
39 * Type we will request for the next DHT PUT round from the datastore. 36 * Type we will request for the next DHT PUT round from the datastore.
40 */ 37 */
@@ -52,9 +49,6 @@ static GNUNET_SCHEDULER_TaskIdentifier dht_task;
52static unsigned int zero_anonymity_count_estimate; 49static unsigned int zero_anonymity_count_estimate;
53 50
54 51
55
56
57
58/** 52/**
59 * Task that is run periodically to obtain blocks for DHT PUTs. 53 * Task that is run periodically to obtain blocks for DHT PUTs.
60 * 54 *
@@ -101,7 +95,6 @@ consider_dht_put_gathering (void *cls)
101} 95}
102 96
103 97
104
105/** 98/**
106 * Store content in DHT. 99 * Store content in DHT.
107 * 100 *
@@ -172,7 +165,6 @@ process_dht_put_content (void *cls,
172} 165}
173 166
174 167
175
176/** 168/**
177 * Task that is run periodically to obtain blocks for DHT PUTs. 169 * Task that is run periodically to obtain blocks for DHT PUTs.
178 * 170 *
@@ -195,3 +187,36 @@ gather_dht_put_blocks (void *cls,
195 GNUNET_assert (dht_qe != NULL); 187 GNUNET_assert (dht_qe != NULL);
196 } 188 }
197} 189}
190
191
192/**
193 * Setup the module.
194 *
195 * @param cfg configuration to use
196 */
197void
198GSF_put_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
199{
200 dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL);
201}
202
203
204/**
205 * Shutdown the module.
206 */
207void
208GSF_put_done_ ()
209{
210 if (GNUNET_SCHEDULER_NO_TASK != dht_task)
211 {
212 GNUNET_SCHEDULER_cancel (dht_task);
213 dht_task = GNUNET_SCHEDULER_NO_TASK;
214 }
215 if (NULL != dht_qe)
216 {
217 GNUNET_DATASTORE_cancel (dht_qe);
218 dht_qe = NULL;
219 }
220}
221
222/* end of gnunet-service-fs_put.c */
diff --git a/src/fs/gnunet-service-fs_put.h b/src/fs/gnunet-service-fs_put.h
new file mode 100644
index 000000000..ebec818b2
--- /dev/null
+++ b/src/fs/gnunet-service-fs_put.h
@@ -0,0 +1,48 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_put.h
23 * @brief support for putting content into the DHT
24 * @author Christian Grothoff
25 */
26#ifndef GNUNET_SERVICE_FS_PUT_H
27#define GNUNET_SERVICE_FS_PUT_H
28
29#include "gnunet-service-fs.h"
30
31
32/**
33 * Setup the module.
34 *
35 * @param cfg configuration to use
36 */
37void
38GSF_put_init_ (struct GNUNET_CONFIGURATION_Handle *cfg);
39
40
41/**
42 * Shutdown the module.
43 */
44void
45GSF_put_done_ (void);
46
47
48#endif