diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
commit | a78990b412db2c0ead2da8061c4f454f068991d1 (patch) | |
tree | 2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs/gnunet-service-fs_push.c | |
parent | 406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff) | |
download | gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip |
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs/gnunet-service-fs_push.c')
-rw-r--r-- | src/fs/gnunet-service-fs_push.c | 190 |
1 files changed, 76 insertions, 114 deletions
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index 59f3772f5..1573bc160 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2011 GNUnet e.V. | 3 | Copyright (C) 2011, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -102,8 +102,7 @@ struct MigrationReadyBlock | |||
102 | 102 | ||
103 | 103 | ||
104 | /** | 104 | /** |
105 | * Information about a peer waiting for | 105 | * Information about a peer waiting for migratable data. |
106 | * migratable data. | ||
107 | */ | 106 | */ |
108 | struct MigrationReadyPeer | 107 | struct MigrationReadyPeer |
109 | { | 108 | { |
@@ -123,15 +122,9 @@ struct MigrationReadyPeer | |||
123 | struct GSF_ConnectedPeer *peer; | 122 | struct GSF_ConnectedPeer *peer; |
124 | 123 | ||
125 | /** | 124 | /** |
126 | * Handle for current transmission request, | 125 | * Envelope of the currently pushed message. |
127 | * or NULL for none. | ||
128 | */ | 126 | */ |
129 | struct GSF_PeerTransmitHandle *th; | 127 | struct GNUNET_MQ_Envelope *env; |
130 | |||
131 | /** | ||
132 | * Message we are trying to push right now (or NULL) | ||
133 | */ | ||
134 | struct PutMessage *msg; | ||
135 | }; | 128 | }; |
136 | 129 | ||
137 | 130 | ||
@@ -163,7 +156,7 @@ static struct GNUNET_DATASTORE_QueueEntry *mig_qe; | |||
163 | /** | 156 | /** |
164 | * ID of task that collects blocks for migration. | 157 | * ID of task that collects blocks for migration. |
165 | */ | 158 | */ |
166 | static struct GNUNET_SCHEDULER_Task * mig_task; | 159 | static struct GNUNET_SCHEDULER_Task *mig_task; |
167 | 160 | ||
168 | /** | 161 | /** |
169 | * What is the maximum frequency at which we are allowed to | 162 | * What is the maximum frequency at which we are allowed to |
@@ -195,8 +188,11 @@ static int value_found; | |||
195 | static void | 188 | static void |
196 | delete_migration_block (struct MigrationReadyBlock *mb) | 189 | delete_migration_block (struct MigrationReadyBlock *mb) |
197 | { | 190 | { |
198 | GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); | 191 | GNUNET_CONTAINER_DLL_remove (mig_head, |
199 | GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); | 192 | mig_tail, |
193 | mb); | ||
194 | GNUNET_PEER_decrement_rcs (mb->target_list, | ||
195 | MIGRATION_LIST_SIZE); | ||
200 | mig_size--; | 196 | mig_size--; |
201 | GNUNET_free (mb); | 197 | GNUNET_free (mb); |
202 | } | 198 | } |
@@ -204,49 +200,11 @@ delete_migration_block (struct MigrationReadyBlock *mb) | |||
204 | 200 | ||
205 | /** | 201 | /** |
206 | * Find content for migration to this peer. | 202 | * Find content for migration to this peer. |
207 | */ | ||
208 | static void | ||
209 | find_content (struct MigrationReadyPeer *mrp); | ||
210 | |||
211 | |||
212 | /** | ||
213 | * Transmit the message currently scheduled for transmission. | ||
214 | * | 203 | * |
215 | * @param cls the `struct MigrationReadyPeer` | 204 | * @param cls a `struct MigrationReadyPeer *` |
216 | * @param buf_size number of bytes available in @a buf | ||
217 | * @param buf where to copy the message, NULL on error (peer disconnect) | ||
218 | * @return number of bytes copied to @a buf, can be 0 (without indicating an error) | ||
219 | */ | 205 | */ |
220 | static size_t | 206 | static void |
221 | transmit_message (void *cls, | 207 | find_content (void *cls); |
222 | size_t buf_size, | ||
223 | void *buf) | ||
224 | { | ||
225 | struct MigrationReadyPeer *peer = cls; | ||
226 | struct PutMessage *msg; | ||
227 | uint16_t msize; | ||
228 | |||
229 | peer->th = NULL; | ||
230 | msg = peer->msg; | ||
231 | peer->msg = NULL; | ||
232 | if (NULL == buf) | ||
233 | { | ||
234 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
235 | "Failed to migrate content to another peer (disconnect)\n"); | ||
236 | GNUNET_free (msg); | ||
237 | return 0; | ||
238 | } | ||
239 | msize = ntohs (msg->header.size); | ||
240 | GNUNET_assert (msize <= buf_size); | ||
241 | GNUNET_memcpy (buf, msg, msize); | ||
242 | GNUNET_free (msg); | ||
243 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
244 | "Pushing %u bytes to %s\n", | ||
245 | msize, | ||
246 | GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); | ||
247 | find_content (peer); | ||
248 | return msize; | ||
249 | } | ||
250 | 208 | ||
251 | 209 | ||
252 | /** | 210 | /** |
@@ -257,31 +215,30 @@ transmit_message (void *cls, | |||
257 | * @return #GNUNET_YES if the block was deleted (!) | 215 | * @return #GNUNET_YES if the block was deleted (!) |
258 | */ | 216 | */ |
259 | static int | 217 | static int |
260 | transmit_content (struct MigrationReadyPeer *peer, | 218 | transmit_content (struct MigrationReadyPeer *mrp, |
261 | struct MigrationReadyBlock *block) | 219 | struct MigrationReadyBlock *block) |
262 | { | 220 | { |
263 | size_t msize; | ||
264 | struct PutMessage *msg; | 221 | struct PutMessage *msg; |
265 | unsigned int i; | 222 | unsigned int i; |
266 | struct GSF_PeerPerformanceData *ppd; | 223 | struct GSF_PeerPerformanceData *ppd; |
267 | int ret; | 224 | int ret; |
268 | 225 | ||
269 | ppd = GSF_get_peer_performance_data_ (peer->peer); | 226 | ppd = GSF_get_peer_performance_data_ (mrp->peer); |
270 | GNUNET_assert (NULL == peer->th); | 227 | mrp->env = GNUNET_MQ_msg_extra (msg, |
271 | msize = sizeof (struct PutMessage) + block->size; | 228 | block->size, |
272 | msg = GNUNET_malloc (msize); | 229 | GNUNET_MESSAGE_TYPE_FS_PUT); |
273 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | ||
274 | msg->header.size = htons (msize); | ||
275 | msg->type = htonl (block->type); | 230 | msg->type = htonl (block->type); |
276 | msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); | 231 | msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); |
277 | GNUNET_memcpy (&msg[1], &block[1], block->size); | 232 | GNUNET_memcpy (&msg[1], |
278 | peer->msg = msg; | 233 | &block[1], |
234 | block->size); | ||
279 | for (i = 0; i < MIGRATION_LIST_SIZE; i++) | 235 | for (i = 0; i < MIGRATION_LIST_SIZE; i++) |
280 | { | 236 | { |
281 | if (block->target_list[i] == 0) | 237 | if (block->target_list[i] == 0) |
282 | { | 238 | { |
283 | block->target_list[i] = ppd->pid; | 239 | block->target_list[i] = ppd->pid; |
284 | GNUNET_PEER_change_rc (block->target_list[i], 1); | 240 | GNUNET_PEER_change_rc (block->target_list[i], |
241 | 1); | ||
285 | break; | 242 | break; |
286 | } | 243 | } |
287 | } | 244 | } |
@@ -294,15 +251,13 @@ transmit_content (struct MigrationReadyPeer *peer, | |||
294 | { | 251 | { |
295 | ret = GNUNET_NO; | 252 | ret = GNUNET_NO; |
296 | } | 253 | } |
297 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 254 | GNUNET_MQ_notify_sent (mrp->env, |
298 | "Asking for transmission of %u bytes to %s for migration\n", | 255 | &find_content, |
299 | (unsigned int) msize, | 256 | mrp); |
300 | GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); | 257 | GSF_peer_transmit_ (mrp->peer, |
301 | peer->th = GSF_peer_transmit_ (peer->peer, | 258 | GNUNET_NO, |
302 | GNUNET_NO, 0 /* priority */ , | 259 | 0 /* priority */ , |
303 | GNUNET_TIME_UNIT_FOREVER_REL, | 260 | mrp->env); |
304 | msize, | ||
305 | &transmit_message, peer); | ||
306 | return ret; | 261 | return ret; |
307 | } | 262 | } |
308 | 263 | ||
@@ -330,12 +285,12 @@ count_targets (struct MigrationReadyBlock *block) | |||
330 | * Check if sending this block to this peer would | 285 | * Check if sending this block to this peer would |
331 | * be a good idea. | 286 | * be a good idea. |
332 | * | 287 | * |
333 | * @param peer target peer | 288 | * @param mrp target peer |
334 | * @param block the block | 289 | * @param block the block |
335 | * @return score (>= 0: feasible, negative: infeasible) | 290 | * @return score (>= 0: feasible, negative: infeasible) |
336 | */ | 291 | */ |
337 | static long | 292 | static long |
338 | score_content (struct MigrationReadyPeer *peer, | 293 | score_content (struct MigrationReadyPeer *mrp, |
339 | struct MigrationReadyBlock *block) | 294 | struct MigrationReadyBlock *block) |
340 | { | 295 | { |
341 | unsigned int i; | 296 | unsigned int i; |
@@ -344,14 +299,18 @@ score_content (struct MigrationReadyPeer *peer, | |||
344 | struct GNUNET_HashCode hc; | 299 | struct GNUNET_HashCode hc; |
345 | uint32_t dist; | 300 | uint32_t dist; |
346 | 301 | ||
347 | ppd = GSF_get_peer_performance_data_ (peer->peer); | 302 | ppd = GSF_get_peer_performance_data_ (mrp->peer); |
348 | for (i = 0; i < MIGRATION_LIST_SIZE; i++) | 303 | for (i = 0; i < MIGRATION_LIST_SIZE; i++) |
349 | if (block->target_list[i] == ppd->pid) | 304 | if (block->target_list[i] == ppd->pid) |
350 | return -1; | 305 | return -1; |
351 | GNUNET_assert (0 != ppd->pid); | 306 | GNUNET_assert (0 != ppd->pid); |
352 | GNUNET_PEER_resolve (ppd->pid, &id); | 307 | GNUNET_PEER_resolve (ppd->pid, |
353 | GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc); | 308 | &id); |
354 | dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc); | 309 | GNUNET_CRYPTO_hash (&id, |
310 | sizeof (struct GNUNET_PeerIdentity), | ||
311 | &hc); | ||
312 | dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, | ||
313 | &hc); | ||
355 | /* closer distance, higher score: */ | 314 | /* closer distance, higher score: */ |
356 | return UINT32_MAX - dist; | 315 | return UINT32_MAX - dist; |
357 | } | 316 | } |
@@ -368,17 +327,18 @@ consider_gathering (void); | |||
368 | /** | 327 | /** |
369 | * Find content for migration to this peer. | 328 | * Find content for migration to this peer. |
370 | * | 329 | * |
371 | * @param mrp peer to find content for | 330 | * @param cls peer to find content for |
372 | */ | 331 | */ |
373 | static void | 332 | static void |
374 | find_content (struct MigrationReadyPeer *mrp) | 333 | find_content (void *cls) |
375 | { | 334 | { |
335 | struct MigrationReadyPeer *mrp = cls; | ||
376 | struct MigrationReadyBlock *pos; | 336 | struct MigrationReadyBlock *pos; |
377 | long score; | 337 | long score; |
378 | long best_score; | 338 | long best_score; |
379 | struct MigrationReadyBlock *best; | 339 | struct MigrationReadyBlock *best; |
380 | 340 | ||
381 | GNUNET_assert (NULL == mrp->th); | 341 | mrp->env = NULL; |
382 | best = NULL; | 342 | best = NULL; |
383 | best_score = -1; | 343 | best_score = -1; |
384 | pos = mig_head; | 344 | pos = mig_head; |
@@ -423,7 +383,8 @@ find_content (struct MigrationReadyPeer *mrp) | |||
423 | } | 383 | } |
424 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 384 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
425 | "Preparing to push best content to peer\n"); | 385 | "Preparing to push best content to peer\n"); |
426 | transmit_content (mrp, best); | 386 | transmit_content (mrp, |
387 | best); | ||
427 | } | 388 | } |
428 | 389 | ||
429 | 390 | ||
@@ -454,9 +415,12 @@ consider_gathering () | |||
454 | return; | 415 | return; |
455 | if (mig_size >= MAX_MIGRATION_QUEUE) | 416 | if (mig_size >= MAX_MIGRATION_QUEUE) |
456 | return; | 417 | return; |
457 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); | 418 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
458 | delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); | 419 | mig_size); |
459 | delay = GNUNET_TIME_relative_max (delay, min_migration_delay); | 420 | delay = GNUNET_TIME_relative_divide (delay, |
421 | MAX_MIGRATION_QUEUE); | ||
422 | delay = GNUNET_TIME_relative_max (delay, | ||
423 | min_migration_delay); | ||
460 | if (GNUNET_NO == value_found) | 424 | if (GNUNET_NO == value_found) |
461 | { | 425 | { |
462 | /* wait at least 5s if the datastore is empty */ | 426 | /* wait at least 5s if the datastore is empty */ |
@@ -467,8 +431,9 @@ consider_gathering () | |||
467 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 431 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
468 | "Scheduling gathering task (queue size: %u)\n", | 432 | "Scheduling gathering task (queue size: %u)\n", |
469 | mig_size); | 433 | mig_size); |
470 | mig_task = | 434 | mig_task = GNUNET_SCHEDULER_add_delayed (delay, |
471 | GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); | 435 | &gather_migration_blocks, |
436 | NULL); | ||
472 | } | 437 | } |
473 | 438 | ||
474 | 439 | ||
@@ -549,14 +514,12 @@ process_migration_content (void *cls, | |||
549 | mig_size++; | 514 | mig_size++; |
550 | for (pos = peer_head; NULL != pos; pos = pos->next) | 515 | for (pos = peer_head; NULL != pos; pos = pos->next) |
551 | { | 516 | { |
552 | if (NULL == pos->th) | 517 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
553 | { | 518 | "Preparing to push best content to peer %s\n", |
554 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 519 | GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); |
555 | "Preparing to push best content to peer %s\n", | 520 | if (GNUNET_YES == transmit_content (pos, |
556 | GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); | 521 | mb)) |
557 | if (GNUNET_YES == transmit_content (pos, mb)) | 522 | break; /* 'mb' was freed! */ |
558 | break; /* 'mb' was freed! */ | ||
559 | } | ||
560 | } | 523 | } |
561 | consider_gathering (); | 524 | consider_gathering (); |
562 | } | 525 | } |
@@ -580,9 +543,11 @@ gather_migration_blocks (void *cls) | |||
580 | "Asking datastore for content for replication (queue size: %u)\n", | 543 | "Asking datastore for content for replication (queue size: %u)\n", |
581 | mig_size); | 544 | mig_size); |
582 | value_found = GNUNET_NO; | 545 | value_found = GNUNET_NO; |
583 | mig_qe = | 546 | mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, |
584 | GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, | 547 | 0, |
585 | &process_migration_content, NULL); | 548 | UINT_MAX, |
549 | &process_migration_content, | ||
550 | NULL); | ||
586 | if (NULL == mig_qe) | 551 | if (NULL == mig_qe) |
587 | consider_gathering (); | 552 | consider_gathering (); |
588 | } | 553 | } |
@@ -640,19 +605,11 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer) | |||
640 | break; | 605 | break; |
641 | if (NULL == pos) | 606 | if (NULL == pos) |
642 | return; | 607 | return; |
608 | if (NULL != pos->env) | ||
609 | GNUNET_MQ_send_cancel (pos->env); | ||
643 | GNUNET_CONTAINER_DLL_remove (peer_head, | 610 | GNUNET_CONTAINER_DLL_remove (peer_head, |
644 | peer_tail, | 611 | peer_tail, |
645 | pos); | 612 | pos); |
646 | if (NULL != pos->th) | ||
647 | { | ||
648 | GSF_peer_transmit_cancel_ (pos->th); | ||
649 | pos->th = NULL; | ||
650 | } | ||
651 | if (NULL != pos->msg) | ||
652 | { | ||
653 | GNUNET_free (pos->msg); | ||
654 | pos->msg = NULL; | ||
655 | } | ||
656 | GNUNET_free (pos); | 613 | GNUNET_free (pos); |
657 | } | 614 | } |
658 | 615 | ||
@@ -664,16 +621,21 @@ void | |||
664 | GSF_push_init_ () | 621 | GSF_push_init_ () |
665 | { | 622 | { |
666 | enabled = | 623 | enabled = |
667 | GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); | 624 | GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, |
625 | "FS", | ||
626 | "CONTENT_PUSHING"); | ||
668 | if (GNUNET_YES != enabled) | 627 | if (GNUNET_YES != enabled) |
669 | return; | 628 | return; |
670 | 629 | ||
671 | if (GNUNET_OK != | 630 | if (GNUNET_OK != |
672 | GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", | 631 | GNUNET_CONFIGURATION_get_value_time (GSF_cfg, |
632 | "fs", | ||
633 | "MIN_MIGRATION_DELAY", | ||
673 | &min_migration_delay)) | 634 | &min_migration_delay)) |
674 | { | 635 | { |
675 | GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, | 636 | GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, |
676 | "fs", "MIN_MIGRATION_DELAY", | 637 | "fs", |
638 | "MIN_MIGRATION_DELAY", | ||
677 | _("time required, content pushing disabled")); | 639 | _("time required, content pushing disabled")); |
678 | return; | 640 | return; |
679 | } | 641 | } |