aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_push.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
commita78990b412db2c0ead2da8061c4f454f068991d1 (patch)
tree2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs/gnunet-service-fs_push.c
parent406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff)
downloadgnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz
gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs/gnunet-service-fs_push.c')
-rw-r--r--src/fs/gnunet-service-fs_push.c190
1 files changed, 76 insertions, 114 deletions
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
index 59f3772f5..1573bc160 100644
--- a/src/fs/gnunet-service-fs_push.c
+++ b/src/fs/gnunet-service-fs_push.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2011 GNUnet e.V. 3 Copyright (C) 2011, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -102,8 +102,7 @@ struct MigrationReadyBlock
102 102
103 103
104/** 104/**
105 * Information about a peer waiting for 105 * Information about a peer waiting for migratable data.
106 * migratable data.
107 */ 106 */
108struct MigrationReadyPeer 107struct MigrationReadyPeer
109{ 108{
@@ -123,15 +122,9 @@ struct MigrationReadyPeer
123 struct GSF_ConnectedPeer *peer; 122 struct GSF_ConnectedPeer *peer;
124 123
125 /** 124 /**
126 * Handle for current transmission request, 125 * Envelope of the currently pushed message.
127 * or NULL for none.
128 */ 126 */
129 struct GSF_PeerTransmitHandle *th; 127 struct GNUNET_MQ_Envelope *env;
130
131 /**
132 * Message we are trying to push right now (or NULL)
133 */
134 struct PutMessage *msg;
135}; 128};
136 129
137 130
@@ -163,7 +156,7 @@ static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
163/** 156/**
164 * ID of task that collects blocks for migration. 157 * ID of task that collects blocks for migration.
165 */ 158 */
166static struct GNUNET_SCHEDULER_Task * mig_task; 159static struct GNUNET_SCHEDULER_Task *mig_task;
167 160
168/** 161/**
169 * What is the maximum frequency at which we are allowed to 162 * What is the maximum frequency at which we are allowed to
@@ -195,8 +188,11 @@ static int value_found;
195static void 188static void
196delete_migration_block (struct MigrationReadyBlock *mb) 189delete_migration_block (struct MigrationReadyBlock *mb)
197{ 190{
198 GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); 191 GNUNET_CONTAINER_DLL_remove (mig_head,
199 GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); 192 mig_tail,
193 mb);
194 GNUNET_PEER_decrement_rcs (mb->target_list,
195 MIGRATION_LIST_SIZE);
200 mig_size--; 196 mig_size--;
201 GNUNET_free (mb); 197 GNUNET_free (mb);
202} 198}
@@ -204,49 +200,11 @@ delete_migration_block (struct MigrationReadyBlock *mb)
204 200
205/** 201/**
206 * Find content for migration to this peer. 202 * Find content for migration to this peer.
207 */
208static void
209find_content (struct MigrationReadyPeer *mrp);
210
211
212/**
213 * Transmit the message currently scheduled for transmission.
214 * 203 *
215 * @param cls the `struct MigrationReadyPeer` 204 * @param cls a `struct MigrationReadyPeer *`
216 * @param buf_size number of bytes available in @a buf
217 * @param buf where to copy the message, NULL on error (peer disconnect)
218 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
219 */ 205 */
220static size_t 206static void
221transmit_message (void *cls, 207find_content (void *cls);
222 size_t buf_size,
223 void *buf)
224{
225 struct MigrationReadyPeer *peer = cls;
226 struct PutMessage *msg;
227 uint16_t msize;
228
229 peer->th = NULL;
230 msg = peer->msg;
231 peer->msg = NULL;
232 if (NULL == buf)
233 {
234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235 "Failed to migrate content to another peer (disconnect)\n");
236 GNUNET_free (msg);
237 return 0;
238 }
239 msize = ntohs (msg->header.size);
240 GNUNET_assert (msize <= buf_size);
241 GNUNET_memcpy (buf, msg, msize);
242 GNUNET_free (msg);
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Pushing %u bytes to %s\n",
245 msize,
246 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
247 find_content (peer);
248 return msize;
249}
250 208
251 209
252/** 210/**
@@ -257,31 +215,30 @@ transmit_message (void *cls,
257 * @return #GNUNET_YES if the block was deleted (!) 215 * @return #GNUNET_YES if the block was deleted (!)
258 */ 216 */
259static int 217static int
260transmit_content (struct MigrationReadyPeer *peer, 218transmit_content (struct MigrationReadyPeer *mrp,
261 struct MigrationReadyBlock *block) 219 struct MigrationReadyBlock *block)
262{ 220{
263 size_t msize;
264 struct PutMessage *msg; 221 struct PutMessage *msg;
265 unsigned int i; 222 unsigned int i;
266 struct GSF_PeerPerformanceData *ppd; 223 struct GSF_PeerPerformanceData *ppd;
267 int ret; 224 int ret;
268 225
269 ppd = GSF_get_peer_performance_data_ (peer->peer); 226 ppd = GSF_get_peer_performance_data_ (mrp->peer);
270 GNUNET_assert (NULL == peer->th); 227 mrp->env = GNUNET_MQ_msg_extra (msg,
271 msize = sizeof (struct PutMessage) + block->size; 228 block->size,
272 msg = GNUNET_malloc (msize); 229 GNUNET_MESSAGE_TYPE_FS_PUT);
273 msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
274 msg->header.size = htons (msize);
275 msg->type = htonl (block->type); 230 msg->type = htonl (block->type);
276 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); 231 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
277 GNUNET_memcpy (&msg[1], &block[1], block->size); 232 GNUNET_memcpy (&msg[1],
278 peer->msg = msg; 233 &block[1],
234 block->size);
279 for (i = 0; i < MIGRATION_LIST_SIZE; i++) 235 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
280 { 236 {
281 if (block->target_list[i] == 0) 237 if (block->target_list[i] == 0)
282 { 238 {
283 block->target_list[i] = ppd->pid; 239 block->target_list[i] = ppd->pid;
284 GNUNET_PEER_change_rc (block->target_list[i], 1); 240 GNUNET_PEER_change_rc (block->target_list[i],
241 1);
285 break; 242 break;
286 } 243 }
287 } 244 }
@@ -294,15 +251,13 @@ transmit_content (struct MigrationReadyPeer *peer,
294 { 251 {
295 ret = GNUNET_NO; 252 ret = GNUNET_NO;
296 } 253 }
297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 254 GNUNET_MQ_notify_sent (mrp->env,
298 "Asking for transmission of %u bytes to %s for migration\n", 255 &find_content,
299 (unsigned int) msize, 256 mrp);
300 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); 257 GSF_peer_transmit_ (mrp->peer,
301 peer->th = GSF_peer_transmit_ (peer->peer, 258 GNUNET_NO,
302 GNUNET_NO, 0 /* priority */ , 259 0 /* priority */ ,
303 GNUNET_TIME_UNIT_FOREVER_REL, 260 mrp->env);
304 msize,
305 &transmit_message, peer);
306 return ret; 261 return ret;
307} 262}
308 263
@@ -330,12 +285,12 @@ count_targets (struct MigrationReadyBlock *block)
330 * Check if sending this block to this peer would 285 * Check if sending this block to this peer would
331 * be a good idea. 286 * be a good idea.
332 * 287 *
333 * @param peer target peer 288 * @param mrp target peer
334 * @param block the block 289 * @param block the block
335 * @return score (>= 0: feasible, negative: infeasible) 290 * @return score (>= 0: feasible, negative: infeasible)
336 */ 291 */
337static long 292static long
338score_content (struct MigrationReadyPeer *peer, 293score_content (struct MigrationReadyPeer *mrp,
339 struct MigrationReadyBlock *block) 294 struct MigrationReadyBlock *block)
340{ 295{
341 unsigned int i; 296 unsigned int i;
@@ -344,14 +299,18 @@ score_content (struct MigrationReadyPeer *peer,
344 struct GNUNET_HashCode hc; 299 struct GNUNET_HashCode hc;
345 uint32_t dist; 300 uint32_t dist;
346 301
347 ppd = GSF_get_peer_performance_data_ (peer->peer); 302 ppd = GSF_get_peer_performance_data_ (mrp->peer);
348 for (i = 0; i < MIGRATION_LIST_SIZE; i++) 303 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
349 if (block->target_list[i] == ppd->pid) 304 if (block->target_list[i] == ppd->pid)
350 return -1; 305 return -1;
351 GNUNET_assert (0 != ppd->pid); 306 GNUNET_assert (0 != ppd->pid);
352 GNUNET_PEER_resolve (ppd->pid, &id); 307 GNUNET_PEER_resolve (ppd->pid,
353 GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc); 308 &id);
354 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc); 309 GNUNET_CRYPTO_hash (&id,
310 sizeof (struct GNUNET_PeerIdentity),
311 &hc);
312 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
313 &hc);
355 /* closer distance, higher score: */ 314 /* closer distance, higher score: */
356 return UINT32_MAX - dist; 315 return UINT32_MAX - dist;
357} 316}
@@ -368,17 +327,18 @@ consider_gathering (void);
368/** 327/**
369 * Find content for migration to this peer. 328 * Find content for migration to this peer.
370 * 329 *
371 * @param mrp peer to find content for 330 * @param cls peer to find content for
372 */ 331 */
373static void 332static void
374find_content (struct MigrationReadyPeer *mrp) 333find_content (void *cls)
375{ 334{
335 struct MigrationReadyPeer *mrp = cls;
376 struct MigrationReadyBlock *pos; 336 struct MigrationReadyBlock *pos;
377 long score; 337 long score;
378 long best_score; 338 long best_score;
379 struct MigrationReadyBlock *best; 339 struct MigrationReadyBlock *best;
380 340
381 GNUNET_assert (NULL == mrp->th); 341 mrp->env = NULL;
382 best = NULL; 342 best = NULL;
383 best_score = -1; 343 best_score = -1;
384 pos = mig_head; 344 pos = mig_head;
@@ -423,7 +383,8 @@ find_content (struct MigrationReadyPeer *mrp)
423 } 383 }
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425 "Preparing to push best content to peer\n"); 385 "Preparing to push best content to peer\n");
426 transmit_content (mrp, best); 386 transmit_content (mrp,
387 best);
427} 388}
428 389
429 390
@@ -454,9 +415,12 @@ consider_gathering ()
454 return; 415 return;
455 if (mig_size >= MAX_MIGRATION_QUEUE) 416 if (mig_size >= MAX_MIGRATION_QUEUE)
456 return; 417 return;
457 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); 418 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
458 delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); 419 mig_size);
459 delay = GNUNET_TIME_relative_max (delay, min_migration_delay); 420 delay = GNUNET_TIME_relative_divide (delay,
421 MAX_MIGRATION_QUEUE);
422 delay = GNUNET_TIME_relative_max (delay,
423 min_migration_delay);
460 if (GNUNET_NO == value_found) 424 if (GNUNET_NO == value_found)
461 { 425 {
462 /* wait at least 5s if the datastore is empty */ 426 /* wait at least 5s if the datastore is empty */
@@ -467,8 +431,9 @@ consider_gathering ()
467 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
468 "Scheduling gathering task (queue size: %u)\n", 432 "Scheduling gathering task (queue size: %u)\n",
469 mig_size); 433 mig_size);
470 mig_task = 434 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
471 GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); 435 &gather_migration_blocks,
436 NULL);
472} 437}
473 438
474 439
@@ -549,14 +514,12 @@ process_migration_content (void *cls,
549 mig_size++; 514 mig_size++;
550 for (pos = peer_head; NULL != pos; pos = pos->next) 515 for (pos = peer_head; NULL != pos; pos = pos->next)
551 { 516 {
552 if (NULL == pos->th) 517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553 { 518 "Preparing to push best content to peer %s\n",
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 519 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
555 "Preparing to push best content to peer %s\n", 520 if (GNUNET_YES == transmit_content (pos,
556 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); 521 mb))
557 if (GNUNET_YES == transmit_content (pos, mb)) 522 break; /* 'mb' was freed! */
558 break; /* 'mb' was freed! */
559 }
560 } 523 }
561 consider_gathering (); 524 consider_gathering ();
562} 525}
@@ -580,9 +543,11 @@ gather_migration_blocks (void *cls)
580 "Asking datastore for content for replication (queue size: %u)\n", 543 "Asking datastore for content for replication (queue size: %u)\n",
581 mig_size); 544 mig_size);
582 value_found = GNUNET_NO; 545 value_found = GNUNET_NO;
583 mig_qe = 546 mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
584 GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, 547 0,
585 &process_migration_content, NULL); 548 UINT_MAX,
549 &process_migration_content,
550 NULL);
586 if (NULL == mig_qe) 551 if (NULL == mig_qe)
587 consider_gathering (); 552 consider_gathering ();
588} 553}
@@ -640,19 +605,11 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
640 break; 605 break;
641 if (NULL == pos) 606 if (NULL == pos)
642 return; 607 return;
608 if (NULL != pos->env)
609 GNUNET_MQ_send_cancel (pos->env);
643 GNUNET_CONTAINER_DLL_remove (peer_head, 610 GNUNET_CONTAINER_DLL_remove (peer_head,
644 peer_tail, 611 peer_tail,
645 pos); 612 pos);
646 if (NULL != pos->th)
647 {
648 GSF_peer_transmit_cancel_ (pos->th);
649 pos->th = NULL;
650 }
651 if (NULL != pos->msg)
652 {
653 GNUNET_free (pos->msg);
654 pos->msg = NULL;
655 }
656 GNUNET_free (pos); 613 GNUNET_free (pos);
657} 614}
658 615
@@ -664,16 +621,21 @@ void
664GSF_push_init_ () 621GSF_push_init_ ()
665{ 622{
666 enabled = 623 enabled =
667 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); 624 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
625 "FS",
626 "CONTENT_PUSHING");
668 if (GNUNET_YES != enabled) 627 if (GNUNET_YES != enabled)
669 return; 628 return;
670 629
671 if (GNUNET_OK != 630 if (GNUNET_OK !=
672 GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", 631 GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
632 "fs",
633 "MIN_MIGRATION_DELAY",
673 &min_migration_delay)) 634 &min_migration_delay))
674 { 635 {
675 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, 636 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
676 "fs", "MIN_MIGRATION_DELAY", 637 "fs",
638 "MIN_MIGRATION_DELAY",
677 _("time required, content pushing disabled")); 639 _("time required, content pushing disabled"));
678 return; 640 return;
679 } 641 }