diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
commit | a78990b412db2c0ead2da8061c4f454f068991d1 (patch) | |
tree | 2e87adae62fd1ca3cd5a9f4c69248986f78bb106 | |
parent | 406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff) | |
download | gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip |
converting FS to new MQ-based core API
-rw-r--r-- | src/fs/gnunet-service-fs.c | 228 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs.h | 26 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_cp.c | 612 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_cp.h | 94 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_pe.c | 165 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 93 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_pr.h | 62 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_push.c | 190 |
8 files changed, 519 insertions, 951 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 348bab092..bc0da09bc 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2009-2014 GNUnet e.V. | 3 | Copyright (C) 2009-2014, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -177,7 +177,7 @@ static struct GNUNET_LOAD_Value *datastore_get_load; | |||
177 | /** | 177 | /** |
178 | * Identity of this peer. | 178 | * Identity of this peer. |
179 | */ | 179 | */ |
180 | static struct GNUNET_PeerIdentity my_id; | 180 | struct GNUNET_PeerIdentity GSF_my_id; |
181 | 181 | ||
182 | 182 | ||
183 | /** | 183 | /** |
@@ -277,33 +277,26 @@ update_latencies (void *cls, | |||
277 | 277 | ||
278 | 278 | ||
279 | /** | 279 | /** |
280 | * Handle P2P "PUT" message. | 280 | * Check P2P "PUT" message. |
281 | * | 281 | * |
282 | * @param cls closure, always NULL | 282 | * @param cls closure with the `struct GSF_ConnectedPeer` |
283 | * @param other the other peer involved (sender or receiver, NULL | ||
284 | * for loopback messages where we are both sender and receiver) | ||
285 | * @param message the actual message | 283 | * @param message the actual message |
286 | * @return #GNUNET_OK to keep the connection open, | 284 | * @return #GNUNET_OK to keep the connection open, |
287 | * #GNUNET_SYSERR to close it (signal serious error) | 285 | * #GNUNET_SYSERR to close it (signal serious error) |
288 | */ | 286 | */ |
289 | static int | 287 | static int |
290 | handle_p2p_put (void *cls, | 288 | check_p2p_put (void *cls, |
291 | const struct GNUNET_PeerIdentity *other, | 289 | const struct PutMessage *put) |
292 | const struct GNUNET_MessageHeader *message) | ||
293 | { | 290 | { |
294 | struct GSF_ConnectedPeer *cp; | 291 | enum GNUNET_BLOCK_Type type; |
295 | 292 | ||
296 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 293 | type = ntohl (put->type); |
297 | "Received P2P PUT from %s\n", | 294 | if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) |
298 | GNUNET_i2s (other)); | ||
299 | cp = GSF_peer_get_ (other); | ||
300 | if (NULL == cp) | ||
301 | { | 295 | { |
302 | GNUNET_break (0); | 296 | GNUNET_break_op (0); |
303 | return GNUNET_OK; | 297 | return GNUNET_SYSERR; |
304 | } | 298 | } |
305 | GSF_cover_content_count++; | 299 | return GNUNET_OK; |
306 | return GSF_handle_p2p_content_ (cp, message); | ||
307 | } | 300 | } |
308 | 301 | ||
309 | 302 | ||
@@ -324,7 +317,8 @@ consider_request_for_forwarding (void *cls, | |||
324 | { | 317 | { |
325 | struct GSF_PendingRequest *pr = cls; | 318 | struct GSF_PendingRequest *pr = cls; |
326 | 319 | ||
327 | if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer)) | 320 | if (GNUNET_YES != |
321 | GSF_pending_request_test_target_ (pr, peer)) | ||
328 | { | 322 | { |
329 | #if INSANE_STATISTICS | 323 | #if INSANE_STATISTICS |
330 | GNUNET_STATISTICS_update (GSF_stats, | 324 | GNUNET_STATISTICS_update (GSF_stats, |
@@ -333,7 +327,8 @@ consider_request_for_forwarding (void *cls, | |||
333 | #endif | 327 | #endif |
334 | return; | 328 | return; |
335 | } | 329 | } |
336 | GSF_plan_add_ (cp, pr); | 330 | GSF_plan_add_ (cp, |
331 | pr); | ||
337 | } | 332 | } |
338 | 333 | ||
339 | 334 | ||
@@ -347,10 +342,10 @@ consider_request_for_forwarding (void *cls, | |||
347 | * @param pr the pending request we were processing | 342 | * @param pr the pending request we were processing |
348 | * @param result final datastore lookup result | 343 | * @param result final datastore lookup result |
349 | */ | 344 | */ |
350 | static void | 345 | void |
351 | consider_forwarding (void *cls, | 346 | GSF_consider_forwarding (void *cls, |
352 | struct GSF_PendingRequest *pr, | 347 | struct GSF_PendingRequest *pr, |
353 | enum GNUNET_BLOCK_EvaluationResult result) | 348 | enum GNUNET_BLOCK_EvaluationResult result) |
354 | { | 349 | { |
355 | if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) | 350 | if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) |
356 | return; /* we're done... */ | 351 | return; /* we're done... */ |
@@ -363,31 +358,44 @@ consider_forwarding (void *cls, | |||
363 | 358 | ||
364 | 359 | ||
365 | /** | 360 | /** |
366 | * Handle P2P "GET" request. | 361 | * Check P2P "GET" request. |
367 | * | 362 | * |
368 | * @param cls closure, always NULL | 363 | * @param cls closure |
369 | * @param other the other peer involved (sender or receiver, NULL | 364 | * @param gm the actual message |
370 | * for loopback messages where we are both sender and receiver) | ||
371 | * @param message the actual message | ||
372 | * @return #GNUNET_OK to keep the connection open, | 365 | * @return #GNUNET_OK to keep the connection open, |
373 | * #GNUNET_SYSERR to close it (signal serious error) | 366 | * #GNUNET_SYSERR to close it (signal serious error) |
374 | */ | 367 | */ |
375 | static int | 368 | static int |
376 | handle_p2p_get (void *cls, | 369 | check_p2p_get (void *cls, |
377 | const struct GNUNET_PeerIdentity *other, | 370 | const struct GetMessage *gm) |
378 | const struct GNUNET_MessageHeader *message) | ||
379 | { | 371 | { |
380 | struct GSF_PendingRequest *pr; | 372 | size_t msize; |
381 | 373 | unsigned int bm; | |
382 | pr = GSF_handle_p2p_query_ (other, | 374 | unsigned int bits; |
383 | message); | 375 | size_t bfsize; |
384 | if (NULL == pr) | 376 | |
385 | return GNUNET_OK; /* exists, identical to existing request, or malformed */ | 377 | msize = ntohs (gm->header.size); |
386 | GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; | 378 | bm = ntohl (gm->hash_bitmap); |
387 | GSF_local_lookup_ (pr, | 379 | bits = 0; |
388 | &consider_forwarding, | 380 | while (bm > 0) |
389 | NULL); | 381 | { |
390 | return GNUNET_OK; | 382 | if (1 == (bm & 1)) |
383 | bits++; | ||
384 | bm >>= 1; | ||
385 | } | ||
386 | if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity)) | ||
387 | { | ||
388 | GNUNET_break_op (0); | ||
389 | return GNUNET_SYSERR; | ||
390 | } | ||
391 | bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); | ||
392 | /* bfsize must be power of 2, check! */ | ||
393 | if (0 != ((bfsize - 1) & bfsize)) | ||
394 | { | ||
395 | GNUNET_break_op (0); | ||
396 | return GNUNET_SYSERR; | ||
397 | } | ||
398 | return GNUNET_OK; | ||
391 | } | 399 | } |
392 | 400 | ||
393 | 401 | ||
@@ -416,7 +424,8 @@ start_p2p_processing (void *cls, | |||
416 | prd = GSF_pending_request_get_data_ (pr); | 424 | prd = GSF_pending_request_get_data_ (pr); |
417 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 425 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
418 | "Finished database lookup for local request `%s' with result %d\n", | 426 | "Finished database lookup for local request `%s' with result %d\n", |
419 | GNUNET_h2s (&prd->query), result); | 427 | GNUNET_h2s (&prd->query), |
428 | result); | ||
420 | if (0 == prd->anonymity_level) | 429 | if (0 == prd->anonymity_level) |
421 | { | 430 | { |
422 | switch (prd->type) | 431 | switch (prd->type) |
@@ -439,7 +448,7 @@ start_p2p_processing (void *cls, | |||
439 | break; | 448 | break; |
440 | } | 449 | } |
441 | } | 450 | } |
442 | consider_forwarding (NULL, pr, result); | 451 | GSF_consider_forwarding (NULL, pr, result); |
443 | } | 452 | } |
444 | 453 | ||
445 | 454 | ||
@@ -538,7 +547,7 @@ shutdown_task (void *cls) | |||
538 | GSF_cadet_stop_server (); | 547 | GSF_cadet_stop_server (); |
539 | if (NULL != GSF_core) | 548 | if (NULL != GSF_core) |
540 | { | 549 | { |
541 | GNUNET_CORE_disconnect (GSF_core); | 550 | GNUNET_CORE_disconnecT (GSF_core); |
542 | GSF_core = NULL; | 551 | GSF_core = NULL; |
543 | } | 552 | } |
544 | if (NULL != GSF_ats) | 553 | if (NULL != GSF_ats) |
@@ -575,80 +584,7 @@ shutdown_task (void *cls) | |||
575 | 584 | ||
576 | 585 | ||
577 | /** | 586 | /** |
578 | * Function called for each pending request whenever a new | 587 | * Function called after GNUNET_CORE_connecT has succeeded |
579 | * peer connects, giving us a chance to decide about submitting | ||
580 | * the existing request to the new peer. | ||
581 | * | ||
582 | * @param cls the `struct GSF_ConnectedPeer` of the new peer | ||
583 | * @param key query for the request | ||
584 | * @param pr handle to the pending request | ||
585 | * @return #GNUNET_YES to continue to iterate | ||
586 | */ | ||
587 | static int | ||
588 | consider_peer_for_forwarding (void *cls, | ||
589 | const struct GNUNET_HashCode *key, | ||
590 | struct GSF_PendingRequest *pr) | ||
591 | { | ||
592 | struct GSF_ConnectedPeer *cp = cls; | ||
593 | struct GNUNET_PeerIdentity pid; | ||
594 | |||
595 | if (GNUNET_YES != | ||
596 | GSF_pending_request_test_active_ (pr)) | ||
597 | return GNUNET_YES; /* request is not actually active, skip! */ | ||
598 | GSF_connected_peer_get_identity_ (cp, &pid); | ||
599 | if (GNUNET_YES != | ||
600 | GSF_pending_request_test_target_ (pr, &pid)) | ||
601 | { | ||
602 | GNUNET_STATISTICS_update (GSF_stats, | ||
603 | gettext_noop ("# Loopback routes suppressed"), | ||
604 | 1, | ||
605 | GNUNET_NO); | ||
606 | return GNUNET_YES; | ||
607 | } | ||
608 | GSF_plan_add_ (cp, pr); | ||
609 | return GNUNET_YES; | ||
610 | } | ||
611 | |||
612 | |||
613 | /** | ||
614 | * Function called after the creation of a connected peer record is complete. | ||
615 | * | ||
616 | * @param cls closure (unused) | ||
617 | * @param cp handle to the newly created connected peer record | ||
618 | */ | ||
619 | static void | ||
620 | connected_peer_cb (void *cls, | ||
621 | struct GSF_ConnectedPeer *cp) | ||
622 | { | ||
623 | if (NULL == cp) | ||
624 | return; | ||
625 | GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, | ||
626 | cp); | ||
627 | } | ||
628 | |||
629 | |||
630 | /** | ||
631 | * Method called whenever a given peer connects. | ||
632 | * | ||
633 | * @param cls closure, not used | ||
634 | * @param peer peer identity this notification is about | ||
635 | */ | ||
636 | static void | ||
637 | peer_connect_handler (void *cls, | ||
638 | const struct GNUNET_PeerIdentity *peer) | ||
639 | { | ||
640 | if (0 == | ||
641 | GNUNET_CRYPTO_cmp_peer_identity (&my_id, | ||
642 | peer)) | ||
643 | return; | ||
644 | GSF_peer_connect_handler_ (peer, | ||
645 | &connected_peer_cb, | ||
646 | NULL); | ||
647 | } | ||
648 | |||
649 | |||
650 | /** | ||
651 | * Function called after GNUNET_CORE_connect has succeeded | ||
652 | * (or failed for good). Note that the private key of the | 588 | * (or failed for good). Note that the private key of the |
653 | * peer is intentionally not exposed here; if you need it, | 589 | * peer is intentionally not exposed here; if you need it, |
654 | * your process should try to read the private key file | 590 | * your process should try to read the private key file |
@@ -661,7 +597,7 @@ static void | |||
661 | peer_init_handler (void *cls, | 597 | peer_init_handler (void *cls, |
662 | const struct GNUNET_PeerIdentity *my_identity) | 598 | const struct GNUNET_PeerIdentity *my_identity) |
663 | { | 599 | { |
664 | if (0 != GNUNET_CRYPTO_cmp_peer_identity (&my_id, | 600 | if (0 != GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id, |
665 | my_identity)) | 601 | my_identity)) |
666 | { | 602 | { |
667 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 603 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -681,18 +617,23 @@ static int | |||
681 | main_init (struct GNUNET_SERVER_Handle *server, | 617 | main_init (struct GNUNET_SERVER_Handle *server, |
682 | const struct GNUNET_CONFIGURATION_Handle *c) | 618 | const struct GNUNET_CONFIGURATION_Handle *c) |
683 | { | 619 | { |
684 | static const struct GNUNET_CORE_MessageHandler no_p2p_handlers[] = { | 620 | GNUNET_MQ_hd_var_size (p2p_get, |
685 | { NULL, 0, 0 } | 621 | GNUNET_MESSAGE_TYPE_FS_GET, |
622 | struct GetMessage); | ||
623 | GNUNET_MQ_hd_var_size (p2p_put, | ||
624 | GNUNET_MESSAGE_TYPE_FS_PUT, | ||
625 | struct PutMessage); | ||
626 | GNUNET_MQ_hd_fixed_size (p2p_migration_stop, | ||
627 | GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, | ||
628 | struct MigrationStopMessage); | ||
629 | struct GNUNET_MQ_MessageHandler no_p2p_handlers[] = { | ||
630 | GNUNET_MQ_handler_end () | ||
686 | }; | 631 | }; |
687 | static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = { | 632 | struct GNUNET_MQ_MessageHandler p2p_handlers[] = { |
688 | { &handle_p2p_get, | 633 | make_p2p_get_handler (NULL), |
689 | GNUNET_MESSAGE_TYPE_FS_GET, 0 }, | 634 | make_p2p_put_handler (NULL), |
690 | { &handle_p2p_put, | 635 | make_p2p_migration_stop_handler (NULL), |
691 | GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, | 636 | GNUNET_MQ_handler_end () |
692 | { &GSF_handle_p2p_migration_stop_, | ||
693 | GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, | ||
694 | sizeof (struct MigrationStopMessage) }, | ||
695 | { NULL, 0, 0 } | ||
696 | }; | 637 | }; |
697 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { | 638 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { |
698 | { &GNUNET_FS_handle_index_start, NULL, | 639 | { &GNUNET_FS_handle_index_start, NULL, |
@@ -735,28 +676,29 @@ main_init (struct GNUNET_SERVER_Handle *server, | |||
735 | GNUNET_free (keyfile); | 676 | GNUNET_free (keyfile); |
736 | GNUNET_assert (NULL != pk); | 677 | GNUNET_assert (NULL != pk); |
737 | GNUNET_CRYPTO_eddsa_key_get_public (pk, | 678 | GNUNET_CRYPTO_eddsa_key_get_public (pk, |
738 | &my_id.public_key); | 679 | &GSF_my_id.public_key); |
739 | 680 | ||
740 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 681 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
741 | "I am peer %s\n", | 682 | "I am peer %s\n", |
742 | GNUNET_i2s (&my_id)); | 683 | GNUNET_i2s (&GSF_my_id)); |
743 | GSF_core | 684 | GSF_core |
744 | = GNUNET_CORE_connect (GSF_cfg, NULL, | 685 | = GNUNET_CORE_connecT (GSF_cfg, |
686 | NULL, | ||
745 | &peer_init_handler, | 687 | &peer_init_handler, |
746 | &peer_connect_handler, | 688 | &GSF_peer_connect_handler, |
747 | &GSF_peer_disconnect_handler_, | 689 | &GSF_peer_disconnect_handler, |
748 | NULL, GNUNET_NO, | ||
749 | NULL, GNUNET_NO, | ||
750 | (GNUNET_YES == anon_p2p_off) | 690 | (GNUNET_YES == anon_p2p_off) |
751 | ? no_p2p_handlers | 691 | ? no_p2p_handlers |
752 | : p2p_handlers); | 692 | : p2p_handlers); |
753 | if (NULL == GSF_core) | 693 | if (NULL == GSF_core) |
754 | { | 694 | { |
755 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 695 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
756 | _("Failed to connect to `%s' service.\n"), "core"); | 696 | _("Failed to connect to `%s' service.\n"), |
697 | "core"); | ||
757 | return GNUNET_SYSERR; | 698 | return GNUNET_SYSERR; |
758 | } | 699 | } |
759 | GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_, | 700 | GNUNET_SERVER_disconnect_notify (server, |
701 | &GSF_client_disconnect_handler_, | ||
760 | NULL); | 702 | NULL); |
761 | GNUNET_SERVER_add_handlers (server, handlers); | 703 | GNUNET_SERVER_add_handlers (server, handlers); |
762 | cover_age_task = | 704 | cover_age_task = |
diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index 92cb4088e..2a0f7ba29 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h | |||
@@ -223,6 +223,10 @@ extern struct GNUNET_TIME_Relative GSF_avg_latency; | |||
223 | */ | 223 | */ |
224 | extern struct GNUNET_ATS_PerformanceHandle *GSF_ats; | 224 | extern struct GNUNET_ATS_PerformanceHandle *GSF_ats; |
225 | 225 | ||
226 | /** | ||
227 | * Identity of this peer. | ||
228 | */ | ||
229 | extern struct GNUNET_PeerIdentity GSF_my_id; | ||
226 | 230 | ||
227 | /** | 231 | /** |
228 | * Typical priorities we're seeing from other peers right now. Since | 232 | * Typical priorities we're seeing from other peers right now. Since |
@@ -265,13 +269,29 @@ extern unsigned int GSF_datastore_queue_size; | |||
265 | 269 | ||
266 | 270 | ||
267 | /** | 271 | /** |
272 | * Function to be called after we're done processing | ||
273 | * replies from the local lookup. If the result status | ||
274 | * code indicates that there may be more replies, plan | ||
275 | * forwarding the request. | ||
276 | * | ||
277 | * @param cls closure (NULL) | ||
278 | * @param pr the pending request we were processing | ||
279 | * @param result final datastore lookup result | ||
280 | */ | ||
281 | void | ||
282 | GSF_consider_forwarding (void *cls, | ||
283 | struct GSF_PendingRequest *pr, | ||
284 | enum GNUNET_BLOCK_EvaluationResult result); | ||
285 | |||
286 | |||
287 | /** | ||
268 | * Test if the DATABASE (GET) load on this peer is too high | 288 | * Test if the DATABASE (GET) load on this peer is too high |
269 | * to even consider processing the query at | 289 | * to even consider processing the query at |
270 | * all. | 290 | * all. |
271 | * | 291 | * |
272 | * @return GNUNET_YES if the load is too high to do anything (load high) | 292 | * @return #GNUNET_YES if the load is too high to do anything (load high) |
273 | * GNUNET_NO to process normally (load normal) | 293 | * #GNUNET_NO to process normally (load normal) |
274 | * GNUNET_SYSERR to process for free (load low) | 294 | * #GNUNET_SYSERR to process for free (load low) |
275 | */ | 295 | */ |
276 | int | 296 | int |
277 | GSF_test_get_load_too_high_ (uint32_t priority); | 297 | GSF_test_get_load_too_high_ (uint32_t priority); |
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index bda33d766..3f7783ded 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2011 GNUnet e.V. | 3 | Copyright (C) 2011, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -78,19 +78,9 @@ struct GSF_PeerTransmitHandle | |||
78 | struct GNUNET_TIME_Absolute transmission_request_start_time; | 78 | struct GNUNET_TIME_Absolute transmission_request_start_time; |
79 | 79 | ||
80 | /** | 80 | /** |
81 | * Timeout for this request. | 81 | * Envelope with the actual message. |
82 | */ | 82 | */ |
83 | struct GNUNET_TIME_Absolute timeout; | 83 | struct GNUNET_MQ_Envelope *env; |
84 | |||
85 | /** | ||
86 | * Task called on timeout, or 0 for none. | ||
87 | */ | ||
88 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
89 | |||
90 | /** | ||
91 | * Function to call to get the actual message. | ||
92 | */ | ||
93 | GSF_GetMessageCallback gmc; | ||
94 | 84 | ||
95 | /** | 85 | /** |
96 | * Peer this request targets. | 86 | * Peer this request targets. |
@@ -98,16 +88,6 @@ struct GSF_PeerTransmitHandle | |||
98 | struct GSF_ConnectedPeer *cp; | 88 | struct GSF_ConnectedPeer *cp; |
99 | 89 | ||
100 | /** | 90 | /** |
101 | * Closure for @e gmc. | ||
102 | */ | ||
103 | void *gmc_cls; | ||
104 | |||
105 | /** | ||
106 | * Size of the message to be transmitted. | ||
107 | */ | ||
108 | size_t size; | ||
109 | |||
110 | /** | ||
111 | * #GNUNET_YES if this is a query, #GNUNET_NO for content. | 91 | * #GNUNET_YES if this is a query, #GNUNET_NO for content. |
112 | */ | 92 | */ |
113 | int is_query; | 93 | int is_query; |
@@ -147,9 +127,9 @@ struct GSF_DelayedHandle | |||
147 | struct GSF_ConnectedPeer *cp; | 127 | struct GSF_ConnectedPeer *cp; |
148 | 128 | ||
149 | /** | 129 | /** |
150 | * The PUT that was delayed. | 130 | * Envelope of the message that was delayed. |
151 | */ | 131 | */ |
152 | struct PutMessage *pm; | 132 | struct GNUNET_MQ_Envelope *env; |
153 | 133 | ||
154 | /** | 134 | /** |
155 | * Task for the delay. | 135 | * Task for the delay. |
@@ -235,11 +215,6 @@ struct GSF_ConnectedPeer | |||
235 | struct GSF_DelayedHandle *delayed_tail; | 215 | struct GSF_DelayedHandle *delayed_tail; |
236 | 216 | ||
237 | /** | 217 | /** |
238 | * Migration stop message in our queue, or NULL if we have none pending. | ||
239 | */ | ||
240 | struct GSF_PeerTransmitHandle *migration_pth; | ||
241 | |||
242 | /** | ||
243 | * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL). | 218 | * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL). |
244 | */ | 219 | */ |
245 | struct GNUNET_ATS_ReservationContext *rc; | 220 | struct GNUNET_ATS_ReservationContext *rc; |
@@ -256,9 +231,9 @@ struct GSF_ConnectedPeer | |||
256 | 231 | ||
257 | /** | 232 | /** |
258 | * Handle for an active request for transmission to this | 233 | * Handle for an active request for transmission to this |
259 | * peer, or NULL (if core queue was full). | 234 | * peer. |
260 | */ | 235 | */ |
261 | struct GNUNET_CORE_TransmitHandle *cth; | 236 | struct GNUNET_MQ_Handle *mq; |
262 | 237 | ||
263 | /** | 238 | /** |
264 | * Increase in traffic preference still to be submitted | 239 | * Increase in traffic preference still to be submitted |
@@ -267,14 +242,6 @@ struct GSF_ConnectedPeer | |||
267 | uint64_t inc_preference; | 242 | uint64_t inc_preference; |
268 | 243 | ||
269 | /** | 244 | /** |
270 | * Set to 1 if we're currently in the process of calling | ||
271 | * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is | ||
272 | * NULL, we should not call notify_transmit_ready for this | ||
273 | * handle right now). | ||
274 | */ | ||
275 | unsigned int cth_in_progress; | ||
276 | |||
277 | /** | ||
278 | * Number of entries in @e delayed_head DLL. | 245 | * Number of entries in @e delayed_head DLL. |
279 | */ | 246 | */ |
280 | unsigned int delay_queue_size; | 247 | unsigned int delay_queue_size; |
@@ -308,16 +275,6 @@ struct GSF_ConnectedPeer | |||
308 | int did_reserve; | 275 | int did_reserve; |
309 | 276 | ||
310 | /** | 277 | /** |
311 | * Function called when the creation of this record is complete. | ||
312 | */ | ||
313 | GSF_ConnectedPeerCreationCallback creation_cb; | ||
314 | |||
315 | /** | ||
316 | * Closure for @e creation_cb | ||
317 | */ | ||
318 | void *creation_cb_cls; | ||
319 | |||
320 | /** | ||
321 | * Handle to the PEERSTORE iterate request for peer respect value | 278 | * Handle to the PEERSTORE iterate request for peer respect value |
322 | */ | 279 | */ |
323 | struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req; | 280 | struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req; |
@@ -377,15 +334,10 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp) | |||
377 | /** | 334 | /** |
378 | * Core is ready to transmit to a peer, get the message. | 335 | * Core is ready to transmit to a peer, get the message. |
379 | * | 336 | * |
380 | * @param cls the `struct GSF_PeerTransmitHandle` of the message | 337 | * @param cp which peer to send a message to |
381 | * @param size number of bytes core is willing to take | ||
382 | * @param buf where to copy the message | ||
383 | * @return number of bytes copied to @a buf | ||
384 | */ | 338 | */ |
385 | static size_t | 339 | static void |
386 | peer_transmit_ready_cb (void *cls, | 340 | peer_transmit (struct GSF_ConnectedPeer *cp); |
387 | size_t size, | ||
388 | void *buf); | ||
389 | 341 | ||
390 | 342 | ||
391 | /** | 343 | /** |
@@ -418,8 +370,6 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth) | |||
418 | struct GNUNET_PeerIdentity target; | 370 | struct GNUNET_PeerIdentity target; |
419 | 371 | ||
420 | cp = pth->cp; | 372 | cp = pth->cp; |
421 | if ((NULL != cp->cth) || (0 != cp->cth_in_progress)) | ||
422 | return; /* already done */ | ||
423 | GNUNET_assert (0 != cp->ppd.pid); | 373 | GNUNET_assert (0 != cp->ppd.pid); |
424 | GNUNET_PEER_resolve (cp->ppd.pid, &target); | 374 | GNUNET_PEER_resolve (cp->ppd.pid, &target); |
425 | 375 | ||
@@ -449,52 +399,23 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth) | |||
449 | cp); | 399 | cp); |
450 | return; | 400 | return; |
451 | } | 401 | } |
452 | GNUNET_assert (NULL == cp->cth); | 402 | peer_transmit (cp); |
453 | cp->cth_in_progress++; | ||
454 | cp->cth = | ||
455 | GNUNET_CORE_notify_transmit_ready (GSF_core, | ||
456 | GNUNET_YES, | ||
457 | GNUNET_CORE_PRIO_BACKGROUND, | ||
458 | GNUNET_TIME_absolute_get_remaining (pth->timeout), | ||
459 | &target, | ||
460 | pth->size, | ||
461 | &peer_transmit_ready_cb, cp); | ||
462 | GNUNET_assert (NULL != cp->cth); | ||
463 | GNUNET_assert (0 < cp->cth_in_progress--); | ||
464 | } | 403 | } |
465 | 404 | ||
466 | 405 | ||
467 | /** | 406 | /** |
468 | * Core is ready to transmit to a peer, get the message. | 407 | * Core is ready to transmit to a peer, get the message. |
469 | * | 408 | * |
470 | * @param cls the `struct GSF_PeerTransmitHandle` of the message | 409 | * @param cp which peer to send a message to |
471 | * @param size number of bytes core is willing to take | ||
472 | * @param buf where to copy the message | ||
473 | * @return number of bytes copied to @a buf | ||
474 | */ | 410 | */ |
475 | static size_t | 411 | static void |
476 | peer_transmit_ready_cb (void *cls, | 412 | peer_transmit (struct GSF_ConnectedPeer *cp) |
477 | size_t size, | ||
478 | void *buf) | ||
479 | { | 413 | { |
480 | struct GSF_ConnectedPeer *cp = cls; | ||
481 | struct GSF_PeerTransmitHandle *pth = cp->pth_head; | 414 | struct GSF_PeerTransmitHandle *pth = cp->pth_head; |
482 | struct GSF_PeerTransmitHandle *pos; | 415 | struct GSF_PeerTransmitHandle *pos; |
483 | size_t ret; | ||
484 | 416 | ||
485 | cp->cth = NULL; | ||
486 | if (NULL == pth) | 417 | if (NULL == pth) |
487 | return 0; | 418 | return; |
488 | if (pth->size > size) | ||
489 | { | ||
490 | schedule_transmission (pth); | ||
491 | return 0; | ||
492 | } | ||
493 | if (NULL != pth->timeout_task) | ||
494 | { | ||
495 | GNUNET_SCHEDULER_cancel (pth->timeout_task); | ||
496 | pth->timeout_task = NULL; | ||
497 | } | ||
498 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | 419 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, |
499 | cp->pth_tail, | 420 | cp->pth_tail, |
500 | pth); | 421 | pth); |
@@ -512,14 +433,14 @@ peer_transmit_ready_cb (void *cls, | |||
512 | GNUNET_LOAD_update (cp->ppd.transmission_delay, | 433 | GNUNET_LOAD_update (cp->ppd.transmission_delay, |
513 | GNUNET_TIME_absolute_get_duration | 434 | GNUNET_TIME_absolute_get_duration |
514 | (pth->transmission_request_start_time).rel_value_us); | 435 | (pth->transmission_request_start_time).rel_value_us); |
515 | ret = pth->gmc (pth->gmc_cls, size, buf); | 436 | GNUNET_MQ_send (cp->mq, |
437 | pth->env); | ||
438 | GNUNET_free (pth); | ||
516 | if (NULL != (pos = cp->pth_head)) | 439 | if (NULL != (pos = cp->pth_head)) |
517 | { | 440 | { |
518 | GNUNET_assert (pos != pth); | 441 | GNUNET_assert (pos != pth); |
519 | schedule_transmission (pos); | 442 | schedule_transmission (pos); |
520 | } | 443 | } |
521 | GNUNET_free (pth); | ||
522 | return ret; | ||
523 | } | 444 | } |
524 | 445 | ||
525 | 446 | ||
@@ -578,23 +499,10 @@ ats_reserve_callback (void *cls, | |||
578 | } | 499 | } |
579 | cp->did_reserve = GNUNET_YES; | 500 | cp->did_reserve = GNUNET_YES; |
580 | pth = cp->pth_head; | 501 | pth = cp->pth_head; |
581 | if ( (NULL != pth) && | 502 | if (NULL != pth) |
582 | (NULL == cp->cth) && | ||
583 | (0 == cp->cth_in_progress) ) | ||
584 | { | 503 | { |
585 | /* reservation success, try transmission now! */ | 504 | /* reservation success, try transmission now! */ |
586 | cp->cth_in_progress++; | 505 | peer_transmit (cp); |
587 | cp->cth = | ||
588 | GNUNET_CORE_notify_transmit_ready (GSF_core, | ||
589 | GNUNET_YES, | ||
590 | GNUNET_CORE_PRIO_BACKGROUND, | ||
591 | GNUNET_TIME_absolute_get_remaining (pth->timeout), | ||
592 | peer, | ||
593 | pth->size, | ||
594 | &peer_transmit_ready_cb, | ||
595 | cp); | ||
596 | GNUNET_assert (NULL != cp->cth); | ||
597 | GNUNET_assert (0 < cp->cth_in_progress--); | ||
598 | } | 506 | } |
599 | } | 507 | } |
600 | 508 | ||
@@ -614,11 +522,13 @@ peer_respect_cb (void *cls, | |||
614 | struct GSF_ConnectedPeer *cp = cls; | 522 | struct GSF_ConnectedPeer *cp = cls; |
615 | 523 | ||
616 | GNUNET_assert (NULL != cp->respect_iterate_req); | 524 | GNUNET_assert (NULL != cp->respect_iterate_req); |
617 | if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size)) | 525 | if ( (NULL != record) && |
618 | cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value); | 526 | (sizeof (cp->disk_respect) == record->value_size)) |
527 | { | ||
528 | cp->disk_respect = *((uint32_t *)record->value); | ||
529 | cp->ppd.respect += *((uint32_t *)record->value); | ||
530 | } | ||
619 | GSF_push_start_ (cp); | 531 | GSF_push_start_ (cp); |
620 | if (NULL != cp->creation_cb) | ||
621 | cp->creation_cb (cp->creation_cb_cls, cp); | ||
622 | if (NULL != record) | 532 | if (NULL != record) |
623 | GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); | 533 | GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); |
624 | cp->respect_iterate_req = NULL; | 534 | cp->respect_iterate_req = NULL; |
@@ -626,25 +536,68 @@ peer_respect_cb (void *cls, | |||
626 | 536 | ||
627 | 537 | ||
628 | /** | 538 | /** |
539 | * Function called for each pending request whenever a new | ||
540 | * peer connects, giving us a chance to decide about submitting | ||
541 | * the existing request to the new peer. | ||
542 | * | ||
543 | * @param cls the `struct GSF_ConnectedPeer` of the new peer | ||
544 | * @param key query for the request | ||
545 | * @param pr handle to the pending request | ||
546 | * @return #GNUNET_YES to continue to iterate | ||
547 | */ | ||
548 | static int | ||
549 | consider_peer_for_forwarding (void *cls, | ||
550 | const struct GNUNET_HashCode *key, | ||
551 | struct GSF_PendingRequest *pr) | ||
552 | { | ||
553 | struct GSF_ConnectedPeer *cp = cls; | ||
554 | struct GNUNET_PeerIdentity pid; | ||
555 | |||
556 | if (GNUNET_YES != | ||
557 | GSF_pending_request_test_active_ (pr)) | ||
558 | return GNUNET_YES; /* request is not actually active, skip! */ | ||
559 | GSF_connected_peer_get_identity_ (cp, &pid); | ||
560 | if (GNUNET_YES != | ||
561 | GSF_pending_request_test_target_ (pr, &pid)) | ||
562 | { | ||
563 | GNUNET_STATISTICS_update (GSF_stats, | ||
564 | gettext_noop ("# Loopback routes suppressed"), | ||
565 | 1, | ||
566 | GNUNET_NO); | ||
567 | return GNUNET_YES; | ||
568 | } | ||
569 | GSF_plan_add_ (cp, pr); | ||
570 | return GNUNET_YES; | ||
571 | } | ||
572 | |||
573 | |||
574 | /** | ||
629 | * A peer connected to us. Setup the connected peer | 575 | * A peer connected to us. Setup the connected peer |
630 | * records. | 576 | * records. |
631 | * | 577 | * |
578 | * @param cls NULL | ||
632 | * @param peer identity of peer that connected | 579 | * @param peer identity of peer that connected |
633 | * @param creation_cb callback function when the record is created. | 580 | * @param mq message queue for talking to @a peer |
634 | * @param creation_cb_cls closure for @creation_cb | 581 | * @return our internal handle for the peer |
635 | */ | 582 | */ |
636 | void | 583 | void * |
637 | GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, | 584 | GSF_peer_connect_handler (void *cls, |
638 | GSF_ConnectedPeerCreationCallback creation_cb, | 585 | const struct GNUNET_PeerIdentity *peer, |
639 | void *creation_cb_cls) | 586 | struct GNUNET_MQ_Handle *mq) |
640 | { | 587 | { |
641 | struct GSF_ConnectedPeer *cp; | 588 | struct GSF_ConnectedPeer *cp; |
642 | 589 | ||
590 | if (0 == | ||
591 | GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id, | ||
592 | peer)) | ||
593 | return NULL; | ||
643 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 594 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
644 | "Connected to peer %s\n", | 595 | "Connected to peer %s\n", |
645 | GNUNET_i2s (peer)); | 596 | GNUNET_i2s (peer)); |
646 | cp = GNUNET_new (struct GSF_ConnectedPeer); | 597 | cp = GNUNET_new (struct GSF_ConnectedPeer); |
647 | cp->ppd.pid = GNUNET_PEER_intern (peer); | 598 | cp->ppd.pid = GNUNET_PEER_intern (peer); |
599 | cp->ppd.peer = peer; | ||
600 | cp->mq = mq; | ||
648 | cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO); | 601 | cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO); |
649 | cp->rc = | 602 | cp->rc = |
650 | GNUNET_ATS_reserve_bandwidth (GSF_ats, | 603 | GNUNET_ATS_reserve_bandwidth (GSF_ats, |
@@ -662,14 +615,17 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, | |||
662 | gettext_noop ("# peers connected"), | 615 | gettext_noop ("# peers connected"), |
663 | GNUNET_CONTAINER_multipeermap_size (cp_map), | 616 | GNUNET_CONTAINER_multipeermap_size (cp_map), |
664 | GNUNET_NO); | 617 | GNUNET_NO); |
665 | cp->creation_cb = creation_cb; | 618 | cp->respect_iterate_req |
666 | cp->creation_cb_cls = creation_cb_cls; | 619 | = GNUNET_PEERSTORE_iterate (peerstore, |
667 | cp->respect_iterate_req = | 620 | "fs", |
668 | GNUNET_PEERSTORE_iterate (peerstore, "fs", | 621 | peer, |
669 | peer, "respect", | 622 | "respect", |
670 | GNUNET_TIME_UNIT_FOREVER_REL, | 623 | GNUNET_TIME_UNIT_FOREVER_REL, |
671 | &peer_respect_cb, | 624 | &peer_respect_cb, |
672 | cp); | 625 | cp); |
626 | GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, | ||
627 | cp); | ||
628 | return cp; | ||
673 | } | 629 | } |
674 | 630 | ||
675 | 631 | ||
@@ -714,38 +670,25 @@ GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer) | |||
714 | 670 | ||
715 | 671 | ||
716 | /** | 672 | /** |
717 | * Handle P2P "MIGRATION_STOP" message. | 673 | * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message. |
718 | * | 674 | * |
719 | * @param cls closure, always NULL | 675 | * @param cls closure, the `struct GSF_ConnectedPeer` |
720 | * @param other the other peer involved (sender or receiver, NULL | 676 | * @param msm the actual message |
721 | * for loopback messages where we are both sender and receiver) | ||
722 | * @param message the actual message | ||
723 | * @return #GNUNET_OK to keep the connection open, | ||
724 | * #GNUNET_SYSERR to close it (signal serious error) | ||
725 | */ | 677 | */ |
726 | int | 678 | void |
727 | GSF_handle_p2p_migration_stop_ (void *cls, | 679 | handle_p2p_migration_stop (void *cls, |
728 | const struct GNUNET_PeerIdentity *other, | 680 | const struct MigrationStopMessage *msm) |
729 | const struct GNUNET_MessageHeader *message) | ||
730 | { | 681 | { |
731 | struct GSF_ConnectedPeer *cp; | 682 | struct GSF_ConnectedPeer *cp = cls; |
732 | const struct MigrationStopMessage *msm; | ||
733 | struct GNUNET_TIME_Relative bt; | 683 | struct GNUNET_TIME_Relative bt; |
734 | 684 | ||
735 | msm = (const struct MigrationStopMessage *) message; | ||
736 | cp = GSF_peer_get_ (other); | ||
737 | if (NULL == cp) | ||
738 | { | ||
739 | GNUNET_break (0); | ||
740 | return GNUNET_OK; | ||
741 | } | ||
742 | GNUNET_STATISTICS_update (GSF_stats, | 685 | GNUNET_STATISTICS_update (GSF_stats, |
743 | gettext_noop ("# migration stop messages received"), | 686 | gettext_noop ("# migration stop messages received"), |
744 | 1, GNUNET_NO); | 687 | 1, GNUNET_NO); |
745 | bt = GNUNET_TIME_relative_ntoh (msm->duration); | 688 | bt = GNUNET_TIME_relative_ntoh (msm->duration); |
746 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 689 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
747 | _("Migration of content to peer `%s' blocked for %s\n"), | 690 | _("Migration of content to peer `%s' blocked for %s\n"), |
748 | GNUNET_i2s (other), | 691 | GNUNET_i2s (cp->ppd.peer), |
749 | GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES)); | 692 | GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES)); |
750 | cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); | 693 | cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); |
751 | if ( (NULL == cp->mig_revive_task) && | 694 | if ( (NULL == cp->mig_revive_task) && |
@@ -756,46 +699,6 @@ GSF_handle_p2p_migration_stop_ (void *cls, | |||
756 | GNUNET_SCHEDULER_add_delayed (bt, | 699 | GNUNET_SCHEDULER_add_delayed (bt, |
757 | &revive_migration, cp); | 700 | &revive_migration, cp); |
758 | } | 701 | } |
759 | return GNUNET_OK; | ||
760 | } | ||
761 | |||
762 | |||
763 | /** | ||
764 | * Copy reply and free put message. | ||
765 | * | ||
766 | * @param cls the `struct PutMessage` | ||
767 | * @param buf_size number of bytes available in @a buf | ||
768 | * @param buf where to copy the message, NULL on error (peer disconnect) | ||
769 | * @return number of bytes copied to @a buf, can be 0 (without indicating an error) | ||
770 | */ | ||
771 | static size_t | ||
772 | copy_reply (void *cls, | ||
773 | size_t buf_size, | ||
774 | void *buf) | ||
775 | { | ||
776 | struct PutMessage *pm = cls; | ||
777 | size_t size; | ||
778 | |||
779 | if (NULL != buf) | ||
780 | { | ||
781 | GNUNET_assert (buf_size >= ntohs (pm->header.size)); | ||
782 | size = ntohs (pm->header.size); | ||
783 | GNUNET_memcpy (buf, pm, size); | ||
784 | GNUNET_STATISTICS_update (GSF_stats, | ||
785 | gettext_noop ("# replies transmitted to other peers"), | ||
786 | 1, | ||
787 | GNUNET_NO); | ||
788 | } | ||
789 | else | ||
790 | { | ||
791 | size = 0; | ||
792 | GNUNET_STATISTICS_update (GSF_stats, | ||
793 | gettext_noop ("# replies dropped"), | ||
794 | 1, | ||
795 | GNUNET_NO); | ||
796 | } | ||
797 | GNUNET_free (pm); | ||
798 | return size; | ||
799 | } | 702 | } |
800 | 703 | ||
801 | 704 | ||
@@ -886,13 +789,10 @@ transmit_delayed_now (void *cls) | |||
886 | cp->delayed_tail, | 789 | cp->delayed_tail, |
887 | dh); | 790 | dh); |
888 | cp->delay_queue_size--; | 791 | cp->delay_queue_size--; |
889 | (void) GSF_peer_transmit_ (cp, | 792 | GSF_peer_transmit_ (cp, |
890 | GNUNET_NO, | 793 | GNUNET_NO, |
891 | UINT32_MAX, | 794 | UINT32_MAX, |
892 | REPLY_TIMEOUT, | 795 | dh->env); |
893 | dh->msize, | ||
894 | ©_reply, | ||
895 | dh->pm); | ||
896 | GNUNET_free (dh); | 796 | GNUNET_free (dh); |
897 | } | 797 | } |
898 | 798 | ||
@@ -954,6 +854,7 @@ handle_p2p_reply (void *cls, | |||
954 | struct PeerRequest *peerreq = cls; | 854 | struct PeerRequest *peerreq = cls; |
955 | struct GSF_ConnectedPeer *cp = peerreq->cp; | 855 | struct GSF_ConnectedPeer *cp = peerreq->cp; |
956 | struct GSF_PendingRequestData *prd; | 856 | struct GSF_PendingRequestData *prd; |
857 | struct GNUNET_MQ_Envelope *env; | ||
957 | struct PutMessage *pm; | 858 | struct PutMessage *pm; |
958 | size_t msize; | 859 | size_t msize; |
959 | 860 | ||
@@ -1000,12 +901,14 @@ handle_p2p_reply (void *cls, | |||
1000 | GSF_cover_content_count -= (reply_anonymity_level - 1); | 901 | GSF_cover_content_count -= (reply_anonymity_level - 1); |
1001 | } | 902 | } |
1002 | 903 | ||
1003 | pm = GNUNET_malloc (msize); | 904 | env = GNUNET_MQ_msg_extra (pm, |
1004 | pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | 905 | data_len, |
1005 | pm->header.size = htons (msize); | 906 | GNUNET_MESSAGE_TYPE_FS_PUT); |
1006 | pm->type = htonl (type); | 907 | pm->type = htonl (type); |
1007 | pm->expiration = GNUNET_TIME_absolute_hton (expiration); | 908 | pm->expiration = GNUNET_TIME_absolute_hton (expiration); |
1008 | GNUNET_memcpy (&pm[1], data, data_len); | 909 | GNUNET_memcpy (&pm[1], |
910 | data, | ||
911 | data_len); | ||
1009 | if ( (UINT32_MAX != reply_anonymity_level) && | 912 | if ( (UINT32_MAX != reply_anonymity_level) && |
1010 | (0 != reply_anonymity_level) && | 913 | (0 != reply_anonymity_level) && |
1011 | (GNUNET_YES == GSF_enable_randomized_delays) ) | 914 | (GNUNET_YES == GSF_enable_randomized_delays) ) |
@@ -1014,7 +917,7 @@ handle_p2p_reply (void *cls, | |||
1014 | 917 | ||
1015 | dh = GNUNET_new (struct GSF_DelayedHandle); | 918 | dh = GNUNET_new (struct GSF_DelayedHandle); |
1016 | dh->cp = cp; | 919 | dh->cp = cp; |
1017 | dh->pm = pm; | 920 | dh->env = env; |
1018 | dh->msize = msize; | 921 | dh->msize = msize; |
1019 | GNUNET_CONTAINER_DLL_insert (cp->delayed_head, | 922 | GNUNET_CONTAINER_DLL_insert (cp->delayed_head, |
1020 | cp->delayed_tail, | 923 | cp->delayed_tail, |
@@ -1027,13 +930,10 @@ handle_p2p_reply (void *cls, | |||
1027 | } | 930 | } |
1028 | else | 931 | else |
1029 | { | 932 | { |
1030 | (void) GSF_peer_transmit_ (cp, | 933 | GSF_peer_transmit_ (cp, |
1031 | GNUNET_NO, | 934 | GNUNET_NO, |
1032 | UINT32_MAX, | 935 | UINT32_MAX, |
1033 | REPLY_TIMEOUT, | 936 | env); |
1034 | msize, | ||
1035 | ©_reply, | ||
1036 | pm); | ||
1037 | } | 937 | } |
1038 | if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) | 938 | if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) |
1039 | return; | 939 | return; |
@@ -1265,23 +1165,20 @@ test_exist_cb (void *cls, | |||
1265 | * process replies properly. Does not initiate forwarding or | 1165 | * process replies properly. Does not initiate forwarding or |
1266 | * local database lookups. | 1166 | * local database lookups. |
1267 | * | 1167 | * |
1268 | * @param other the other peer involved (sender or receiver, NULL | 1168 | * @param cls the other peer involved (sender of the message) |
1269 | * for loopback messages where we are both sender and receiver) | 1169 | * @param gm the GET message |
1270 | * @param message the actual message | ||
1271 | * @return pending request handle, NULL on error | ||
1272 | */ | 1170 | */ |
1273 | struct GSF_PendingRequest * | 1171 | void |
1274 | GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | 1172 | handle_p2p_get (void *cls, |
1275 | const struct GNUNET_MessageHeader *message) | 1173 | const struct GetMessage *gm) |
1276 | { | 1174 | { |
1175 | struct GSF_ConnectedPeer *cps = cls; | ||
1277 | struct PeerRequest *peerreq; | 1176 | struct PeerRequest *peerreq; |
1278 | struct GSF_PendingRequest *pr; | 1177 | struct GSF_PendingRequest *pr; |
1279 | struct GSF_ConnectedPeer *cp; | 1178 | struct GSF_ConnectedPeer *cp; |
1280 | struct GSF_ConnectedPeer *cps; | ||
1281 | const struct GNUNET_PeerIdentity *target; | 1179 | const struct GNUNET_PeerIdentity *target; |
1282 | enum GSF_PendingRequestOptions options; | 1180 | enum GSF_PendingRequestOptions options; |
1283 | uint16_t msize; | 1181 | uint16_t msize; |
1284 | const struct GetMessage *gm; | ||
1285 | unsigned int bits; | 1182 | unsigned int bits; |
1286 | const struct GNUNET_PeerIdentity *opt; | 1183 | const struct GNUNET_PeerIdentity *opt; |
1287 | uint32_t bm; | 1184 | uint32_t bm; |
@@ -1291,18 +1188,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1291 | GNUNET_PEER_Id spid; | 1188 | GNUNET_PEER_Id spid; |
1292 | const struct GSF_PendingRequestData *prd; | 1189 | const struct GSF_PendingRequestData *prd; |
1293 | 1190 | ||
1294 | msize = ntohs (message->size); | 1191 | msize = ntohs (gm->header.size); |
1295 | if (msize < sizeof (struct GetMessage)) | ||
1296 | { | ||
1297 | GNUNET_break_op (0); | ||
1298 | return NULL; | ||
1299 | } | ||
1300 | GNUNET_STATISTICS_update (GSF_stats, | ||
1301 | gettext_noop | ||
1302 | ("# GET requests received (from other peers)"), | ||
1303 | 1, | ||
1304 | GNUNET_NO); | ||
1305 | gm = (const struct GetMessage *) message; | ||
1306 | tec.type = ntohl (gm->type); | 1192 | tec.type = ntohl (gm->type); |
1307 | bm = ntohl (gm->hash_bitmap); | 1193 | bm = ntohl (gm->hash_bitmap); |
1308 | bits = 0; | 1194 | bits = 0; |
@@ -1312,32 +1198,16 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1312 | bits++; | 1198 | bits++; |
1313 | bm >>= 1; | 1199 | bm >>= 1; |
1314 | } | 1200 | } |
1315 | if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity)) | ||
1316 | { | ||
1317 | GNUNET_break_op (0); | ||
1318 | return NULL; | ||
1319 | } | ||
1320 | opt = (const struct GNUNET_PeerIdentity *) &gm[1]; | 1201 | opt = (const struct GNUNET_PeerIdentity *) &gm[1]; |
1321 | bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); | 1202 | bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); |
1322 | /* bfsize must be power of 2, check! */ | 1203 | GNUNET_STATISTICS_update (GSF_stats, |
1323 | if (0 != ((bfsize - 1) & bfsize)) | 1204 | gettext_noop |
1324 | { | 1205 | ("# GET requests received (from other peers)"), |
1325 | GNUNET_break_op (0); | 1206 | 1, |
1326 | return NULL; | 1207 | GNUNET_NO); |
1327 | } | ||
1328 | GSF_cover_query_count++; | 1208 | GSF_cover_query_count++; |
1329 | bm = ntohl (gm->hash_bitmap); | 1209 | bm = ntohl (gm->hash_bitmap); |
1330 | bits = 0; | 1210 | bits = 0; |
1331 | cps = GSF_peer_get_ (other); | ||
1332 | if (NULL == cps) | ||
1333 | { | ||
1334 | /* peer must have just disconnected */ | ||
1335 | GNUNET_STATISTICS_update (GSF_stats, | ||
1336 | gettext_noop | ||
1337 | ("# requests dropped due to initiator not being connected"), | ||
1338 | 1, GNUNET_NO); | ||
1339 | return NULL; | ||
1340 | } | ||
1341 | if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) | 1211 | if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) |
1342 | cp = GSF_peer_get_ (&opt[bits++]); | 1212 | cp = GSF_peer_get_ (&opt[bits++]); |
1343 | else | 1213 | else |
@@ -1352,24 +1222,24 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1352 | else | 1222 | else |
1353 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1223 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1354 | "Failed to find peer `%s' in connection set. Dropping query.\n", | 1224 | "Failed to find peer `%s' in connection set. Dropping query.\n", |
1355 | GNUNET_i2s (other)); | 1225 | GNUNET_i2s (cps->ppd.peer)); |
1356 | GNUNET_STATISTICS_update (GSF_stats, | 1226 | GNUNET_STATISTICS_update (GSF_stats, |
1357 | gettext_noop | 1227 | gettext_noop |
1358 | ("# requests dropped due to missing reverse route"), | 1228 | ("# requests dropped due to missing reverse route"), |
1359 | 1, | 1229 | 1, |
1360 | GNUNET_NO); | 1230 | GNUNET_NO); |
1361 | return NULL; | 1231 | return; |
1362 | } | 1232 | } |
1363 | if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER) | 1233 | if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER) |
1364 | { | 1234 | { |
1365 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1235 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1366 | "Peer `%s' has too many replies queued already. Dropping query.\n", | 1236 | "Peer `%s' has too many replies queued already. Dropping query.\n", |
1367 | GNUNET_i2s (other)); | 1237 | GNUNET_i2s (cps->ppd.peer)); |
1368 | GNUNET_STATISTICS_update (GSF_stats, | 1238 | GNUNET_STATISTICS_update (GSF_stats, |
1369 | gettext_noop ("# requests dropped due to full reply queue"), | 1239 | gettext_noop ("# requests dropped due to full reply queue"), |
1370 | 1, | 1240 | 1, |
1371 | GNUNET_NO); | 1241 | GNUNET_NO); |
1372 | return NULL; | 1242 | return; |
1373 | } | 1243 | } |
1374 | /* note that we can really only check load here since otherwise | 1244 | /* note that we can really only check load here since otherwise |
1375 | * peers could find out that we are overloaded by not being | 1245 | * peers could find out that we are overloaded by not being |
@@ -1380,14 +1250,14 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1380 | { | 1250 | { |
1381 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1251 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1382 | "Dropping query from `%s', this peer is too busy.\n", | 1252 | "Dropping query from `%s', this peer is too busy.\n", |
1383 | GNUNET_i2s (other)); | 1253 | GNUNET_i2s (cps->ppd.peer)); |
1384 | return NULL; | 1254 | return; |
1385 | } | 1255 | } |
1386 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1256 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1387 | "Received request for `%s' of type %u from peer `%s' with flags %u\n", | 1257 | "Received request for `%s' of type %u from peer `%s' with flags %u\n", |
1388 | GNUNET_h2s (&gm->query), | 1258 | GNUNET_h2s (&gm->query), |
1389 | (unsigned int) tec.type, | 1259 | (unsigned int) tec.type, |
1390 | GNUNET_i2s (other), | 1260 | GNUNET_i2s (cps->ppd.peer), |
1391 | (unsigned int) bm); | 1261 | (unsigned int) bm); |
1392 | target = | 1262 | target = |
1393 | (0 != | 1263 | (0 != |
@@ -1403,7 +1273,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1403 | * so at best indirect the query */ | 1273 | * so at best indirect the query */ |
1404 | tec.priority = 0; | 1274 | tec.priority = 0; |
1405 | options |= GSF_PRO_FORWARD_ONLY; | 1275 | options |= GSF_PRO_FORWARD_ONLY; |
1406 | spid = GNUNET_PEER_intern (other); | 1276 | spid = GNUNET_PEER_intern (cps->ppd.peer); |
1407 | GNUNET_assert (0 != spid); | 1277 | GNUNET_assert (0 != spid); |
1408 | } | 1278 | } |
1409 | tec.ttl = bound_ttl (ntohl (gm->ttl), | 1279 | tec.ttl = bound_ttl (ntohl (gm->ttl), |
@@ -1412,11 +1282,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1412 | ttl_decrement = | 1282 | ttl_decrement = |
1413 | 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | 1283 | 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, |
1414 | TTL_DECREMENT); | 1284 | TTL_DECREMENT); |
1415 | if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0)) | 1285 | if ( (tec.ttl < 0) && |
1286 | (((int32_t) (tec.ttl - ttl_decrement)) > 0) ) | ||
1416 | { | 1287 | { |
1417 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1288 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1418 | "Dropping query from `%s' due to TTL underflow (%d - %u).\n", | 1289 | "Dropping query from `%s' due to TTL underflow (%d - %u).\n", |
1419 | GNUNET_i2s (other), | 1290 | GNUNET_i2s (cps->ppd.peer), |
1420 | tec.ttl, | 1291 | tec.ttl, |
1421 | ttl_decrement); | 1292 | ttl_decrement); |
1422 | GNUNET_STATISTICS_update (GSF_stats, | 1293 | GNUNET_STATISTICS_update (GSF_stats, |
@@ -1424,7 +1295,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1424 | ("# requests dropped due TTL underflow"), 1, | 1295 | ("# requests dropped due TTL underflow"), 1, |
1425 | GNUNET_NO); | 1296 | GNUNET_NO); |
1426 | /* integer underflow => drop (should be very rare)! */ | 1297 | /* integer underflow => drop (should be very rare)! */ |
1427 | return NULL; | 1298 | return; |
1428 | } | 1299 | } |
1429 | tec.ttl -= ttl_decrement; | 1300 | tec.ttl -= ttl_decrement; |
1430 | 1301 | ||
@@ -1435,7 +1306,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1435 | &test_exist_cb, | 1306 | &test_exist_cb, |
1436 | &tec); | 1307 | &tec); |
1437 | if (GNUNET_YES == tec.finished) | 1308 | if (GNUNET_YES == tec.finished) |
1438 | return NULL; /* merged into existing request, we're done */ | 1309 | return; /* merged into existing request, we're done */ |
1439 | 1310 | ||
1440 | peerreq = GNUNET_new (struct PeerRequest); | 1311 | peerreq = GNUNET_new (struct PeerRequest); |
1441 | peerreq->cp = cp; | 1312 | peerreq->cp = cp; |
@@ -1452,7 +1323,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1452 | (uint32_t) tec.priority, | 1323 | (uint32_t) tec.priority, |
1453 | tec.ttl, | 1324 | tec.ttl, |
1454 | spid, | 1325 | spid, |
1455 | GNUNET_PEER_intern (other), | 1326 | GNUNET_PEER_intern (cps->ppd.peer), |
1456 | NULL, 0, /* replies_seen */ | 1327 | NULL, 0, /* replies_seen */ |
1457 | &handle_p2p_reply, | 1328 | &handle_p2p_reply, |
1458 | peerreq); | 1329 | peerreq); |
@@ -1472,43 +1343,10 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1472 | gettext_noop ("# P2P searches active"), | 1343 | gettext_noop ("# P2P searches active"), |
1473 | 1, | 1344 | 1, |
1474 | GNUNET_NO); | 1345 | GNUNET_NO); |
1475 | return pr; | 1346 | GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; |
1476 | } | 1347 | GSF_local_lookup_ (pr, |
1477 | 1348 | &GSF_consider_forwarding, | |
1478 | 1349 | NULL); | |
1479 | /** | ||
1480 | * Function called if there has been a timeout trying to satisfy | ||
1481 | * a transmission request. | ||
1482 | * | ||
1483 | * @param cls the `struct GSF_PeerTransmitHandle` of the request | ||
1484 | */ | ||
1485 | static void | ||
1486 | peer_transmit_timeout (void *cls) | ||
1487 | { | ||
1488 | struct GSF_PeerTransmitHandle *pth = cls; | ||
1489 | struct GSF_ConnectedPeer *cp; | ||
1490 | |||
1491 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1492 | "Timeout trying to transmit to other peer\n"); | ||
1493 | pth->timeout_task = NULL; | ||
1494 | cp = pth->cp; | ||
1495 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | ||
1496 | cp->pth_tail, | ||
1497 | pth); | ||
1498 | if (GNUNET_YES == pth->is_query) | ||
1499 | GNUNET_assert (0 < cp->ppd.pending_queries--); | ||
1500 | else if (GNUNET_NO == pth->is_query) | ||
1501 | GNUNET_assert (0 < cp->ppd.pending_replies--); | ||
1502 | GNUNET_LOAD_update (cp->ppd.transmission_delay, | ||
1503 | UINT64_MAX); | ||
1504 | if (NULL != cp->cth) | ||
1505 | { | ||
1506 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); | ||
1507 | cp->cth = NULL; | ||
1508 | } | ||
1509 | pth->gmc (pth->gmc_cls, 0, NULL); | ||
1510 | GNUNET_assert (0 == cp->cth_in_progress); | ||
1511 | GNUNET_free (pth); | ||
1512 | } | 1350 | } |
1513 | 1351 | ||
1514 | 1352 | ||
@@ -1520,19 +1358,15 @@ peer_transmit_timeout (void *cls) | |||
1520 | * @param cp target peer | 1358 | * @param cp target peer |
1521 | * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR) | 1359 | * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR) |
1522 | * @param priority how important is this request? | 1360 | * @param priority how important is this request? |
1523 | * @param timeout when does this request timeout (call gmc with error) | 1361 | * @param timeout when does this request timeout |
1524 | * @param size number of bytes we would like to send to the peer | 1362 | * @param size number of bytes we would like to send to the peer |
1525 | * @param gmc function to call to get the message | 1363 | * @param env message to send |
1526 | * @param gmc_cls closure for @a gmc | ||
1527 | * @return handle to cancel request | ||
1528 | */ | 1364 | */ |
1529 | struct GSF_PeerTransmitHandle * | 1365 | void |
1530 | GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | 1366 | GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, |
1531 | int is_query, | 1367 | int is_query, |
1532 | uint32_t priority, | 1368 | uint32_t priority, |
1533 | struct GNUNET_TIME_Relative timeout, | 1369 | struct GNUNET_MQ_Envelope *env) |
1534 | size_t size, | ||
1535 | GSF_GetMessageCallback gmc, void *gmc_cls) | ||
1536 | { | 1370 | { |
1537 | struct GSF_PeerTransmitHandle *pth; | 1371 | struct GSF_PeerTransmitHandle *pth; |
1538 | struct GSF_PeerTransmitHandle *pos; | 1372 | struct GSF_PeerTransmitHandle *pos; |
@@ -1540,10 +1374,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | |||
1540 | 1374 | ||
1541 | pth = GNUNET_new (struct GSF_PeerTransmitHandle); | 1375 | pth = GNUNET_new (struct GSF_PeerTransmitHandle); |
1542 | pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); | 1376 | pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); |
1543 | pth->timeout = GNUNET_TIME_relative_to_absolute (timeout); | 1377 | pth->env = env; |
1544 | pth->gmc = gmc; | ||
1545 | pth->gmc_cls = gmc_cls; | ||
1546 | pth->size = size; | ||
1547 | pth->is_query = is_query; | 1378 | pth->is_query = is_query; |
1548 | pth->priority = priority; | 1379 | pth->priority = priority; |
1549 | pth->cp = cp; | 1380 | pth->cp = cp; |
@@ -1563,39 +1394,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | |||
1563 | cp->ppd.pending_queries++; | 1394 | cp->ppd.pending_queries++; |
1564 | else if (GNUNET_NO == is_query) | 1395 | else if (GNUNET_NO == is_query) |
1565 | cp->ppd.pending_replies++; | 1396 | cp->ppd.pending_replies++; |
1566 | pth->timeout_task | ||
1567 | = GNUNET_SCHEDULER_add_delayed (timeout, | ||
1568 | &peer_transmit_timeout, | ||
1569 | pth); | ||
1570 | schedule_transmission (pth); | 1397 | schedule_transmission (pth); |
1571 | return pth; | ||
1572 | } | ||
1573 | |||
1574 | |||
1575 | /** | ||
1576 | * Cancel an earlier request for transmission. | ||
1577 | * | ||
1578 | * @param pth request to cancel | ||
1579 | */ | ||
1580 | void | ||
1581 | GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) | ||
1582 | { | ||
1583 | struct GSF_ConnectedPeer *cp; | ||
1584 | |||
1585 | if (NULL != pth->timeout_task) | ||
1586 | { | ||
1587 | GNUNET_SCHEDULER_cancel (pth->timeout_task); | ||
1588 | pth->timeout_task = NULL; | ||
1589 | } | ||
1590 | cp = pth->cp; | ||
1591 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | ||
1592 | cp->pth_tail, | ||
1593 | pth); | ||
1594 | if (GNUNET_YES == pth->is_query) | ||
1595 | GNUNET_assert (0 < cp->ppd.pending_queries--); | ||
1596 | else if (GNUNET_NO == pth->is_query) | ||
1597 | GNUNET_assert (0 < cp->ppd.pending_replies--); | ||
1598 | GNUNET_free (pth); | ||
1599 | } | 1398 | } |
1600 | 1399 | ||
1601 | 1400 | ||
@@ -1683,7 +1482,9 @@ flush_respect (void *cls, | |||
1683 | GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect, | 1482 | GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect, |
1684 | sizeof (cp->ppd.respect), | 1483 | sizeof (cp->ppd.respect), |
1685 | GNUNET_TIME_UNIT_FOREVER_ABS, | 1484 | GNUNET_TIME_UNIT_FOREVER_ABS, |
1686 | GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL); | 1485 | GNUNET_PEERSTORE_STOREOPTION_REPLACE, |
1486 | NULL, | ||
1487 | NULL); | ||
1687 | return GNUNET_OK; | 1488 | return GNUNET_OK; |
1688 | } | 1489 | } |
1689 | 1490 | ||
@@ -1693,26 +1494,30 @@ flush_respect (void *cls, | |||
1693 | * record. | 1494 | * record. |
1694 | * | 1495 | * |
1695 | * @param cls unused | 1496 | * @param cls unused |
1696 | * @param peer identity of peer that connected | 1497 | * @param peer identity of peer that disconnected |
1498 | * @param internal_cls the corresponding `struct GSF_ConnectedPeer` | ||
1697 | */ | 1499 | */ |
1698 | void | 1500 | void |
1699 | GSF_peer_disconnect_handler_ (void *cls, | 1501 | GSF_peer_disconnect_handler (void *cls, |
1700 | const struct GNUNET_PeerIdentity *peer) | 1502 | const struct GNUNET_PeerIdentity *peer, |
1503 | void *internal_cls) | ||
1701 | { | 1504 | { |
1702 | struct GSF_ConnectedPeer *cp; | 1505 | struct GSF_ConnectedPeer *cp = internal_cls; |
1703 | struct GSF_PeerTransmitHandle *pth; | 1506 | struct GSF_PeerTransmitHandle *pth; |
1704 | struct GSF_DelayedHandle *dh; | 1507 | struct GSF_DelayedHandle *dh; |
1705 | 1508 | ||
1706 | cp = GSF_peer_get_ (peer); | ||
1707 | if (NULL == cp) | 1509 | if (NULL == cp) |
1708 | return; /* must have been disconnect from core with | 1510 | return; /* must have been disconnect from core with |
1709 | * 'peer' == my_id, ignore */ | 1511 | * 'peer' == my_id, ignore */ |
1710 | flush_respect (NULL, peer, cp); | 1512 | flush_respect (NULL, |
1513 | peer, | ||
1514 | cp); | ||
1711 | GNUNET_assert (GNUNET_YES == | 1515 | GNUNET_assert (GNUNET_YES == |
1712 | GNUNET_CONTAINER_multipeermap_remove (cp_map, | 1516 | GNUNET_CONTAINER_multipeermap_remove (cp_map, |
1713 | peer, | 1517 | peer, |
1714 | cp)); | 1518 | cp)); |
1715 | GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"), | 1519 | GNUNET_STATISTICS_set (GSF_stats, |
1520 | gettext_noop ("# peers connected"), | ||
1716 | GNUNET_CONTAINER_multipeermap_size (cp_map), | 1521 | GNUNET_CONTAINER_multipeermap_size (cp_map), |
1717 | GNUNET_NO); | 1522 | GNUNET_NO); |
1718 | if (NULL != cp->respect_iterate_req) | 1523 | if (NULL != cp->respect_iterate_req) |
@@ -1720,11 +1525,6 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1720 | GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); | 1525 | GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); |
1721 | cp->respect_iterate_req = NULL; | 1526 | cp->respect_iterate_req = NULL; |
1722 | } | 1527 | } |
1723 | if (NULL != cp->migration_pth) | ||
1724 | { | ||
1725 | GSF_peer_transmit_cancel_ (cp->migration_pth); | ||
1726 | cp->migration_pth = NULL; | ||
1727 | } | ||
1728 | if (NULL != cp->rc) | 1528 | if (NULL != cp->rc) |
1729 | { | 1529 | { |
1730 | GNUNET_ATS_reserve_bandwidth_cancel (cp->rc); | 1530 | GNUNET_ATS_reserve_bandwidth_cancel (cp->rc); |
@@ -1748,19 +1548,8 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1748 | 0, | 1548 | 0, |
1749 | sizeof (cp->ppd.last_p2p_replies)); | 1549 | sizeof (cp->ppd.last_p2p_replies)); |
1750 | GSF_push_stop_ (cp); | 1550 | GSF_push_stop_ (cp); |
1751 | if (NULL != cp->cth) | ||
1752 | { | ||
1753 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); | ||
1754 | cp->cth = NULL; | ||
1755 | } | ||
1756 | GNUNET_assert (0 == cp->cth_in_progress); | ||
1757 | while (NULL != (pth = cp->pth_head)) | 1551 | while (NULL != (pth = cp->pth_head)) |
1758 | { | 1552 | { |
1759 | if (pth->timeout_task != NULL) | ||
1760 | { | ||
1761 | GNUNET_SCHEDULER_cancel (pth->timeout_task); | ||
1762 | pth->timeout_task = NULL; | ||
1763 | } | ||
1764 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | 1553 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, |
1765 | cp->pth_tail, | 1554 | cp->pth_tail, |
1766 | pth); | 1555 | pth); |
@@ -1768,7 +1557,6 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1768 | GNUNET_assert (0 < cp->ppd.pending_queries--); | 1557 | GNUNET_assert (0 < cp->ppd.pending_queries--); |
1769 | else if (GNUNET_NO == pth->is_query) | 1558 | else if (GNUNET_NO == pth->is_query) |
1770 | GNUNET_assert (0 < cp->ppd.pending_replies--); | 1559 | GNUNET_assert (0 < cp->ppd.pending_replies--); |
1771 | pth->gmc (pth->gmc_cls, 0, NULL); | ||
1772 | GNUNET_free (pth); | 1560 | GNUNET_free (pth); |
1773 | } | 1561 | } |
1774 | while (NULL != (dh = cp->delayed_head)) | 1562 | while (NULL != (dh = cp->delayed_head)) |
@@ -1776,9 +1564,9 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1776 | GNUNET_CONTAINER_DLL_remove (cp->delayed_head, | 1564 | GNUNET_CONTAINER_DLL_remove (cp->delayed_head, |
1777 | cp->delayed_tail, | 1565 | cp->delayed_tail, |
1778 | dh); | 1566 | dh); |
1567 | GNUNET_MQ_discard (dh->env); | ||
1779 | cp->delay_queue_size--; | 1568 | cp->delay_queue_size--; |
1780 | GNUNET_SCHEDULER_cancel (dh->delay_task); | 1569 | GNUNET_SCHEDULER_cancel (dh->delay_task); |
1781 | GNUNET_free (dh->pm); | ||
1782 | GNUNET_free (dh); | 1570 | GNUNET_free (dh); |
1783 | } | 1571 | } |
1784 | GNUNET_PEER_change_rc (cp->ppd.pid, -1); | 1572 | GNUNET_PEER_change_rc (cp->ppd.pid, -1); |
@@ -1883,40 +1671,6 @@ GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp) | |||
1883 | 1671 | ||
1884 | 1672 | ||
1885 | /** | 1673 | /** |
1886 | * Assemble a migration stop message for transmission. | ||
1887 | * | ||
1888 | * @param cls the `struct GSF_ConnectedPeer` to use | ||
1889 | * @param size number of bytes we're allowed to write to @a buf | ||
1890 | * @param buf where to copy the message | ||
1891 | * @return number of bytes copied to @a buf | ||
1892 | */ | ||
1893 | static size_t | ||
1894 | create_migration_stop_message (void *cls, | ||
1895 | size_t size, | ||
1896 | void *buf) | ||
1897 | { | ||
1898 | struct GSF_ConnectedPeer *cp = cls; | ||
1899 | struct MigrationStopMessage msm; | ||
1900 | |||
1901 | cp->migration_pth = NULL; | ||
1902 | if (NULL == buf) | ||
1903 | return 0; | ||
1904 | GNUNET_assert (size >= sizeof (struct MigrationStopMessage)); | ||
1905 | msm.header.size = htons (sizeof (struct MigrationStopMessage)); | ||
1906 | msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); | ||
1907 | msm.reserved = htonl (0); | ||
1908 | msm.duration = | ||
1909 | GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining | ||
1910 | (cp->last_migration_block)); | ||
1911 | GNUNET_memcpy (buf, &msm, sizeof (struct MigrationStopMessage)); | ||
1912 | GNUNET_STATISTICS_update (GSF_stats, | ||
1913 | gettext_noop ("# migration stop messages sent"), | ||
1914 | 1, GNUNET_NO); | ||
1915 | return sizeof (struct MigrationStopMessage); | ||
1916 | } | ||
1917 | |||
1918 | |||
1919 | /** | ||
1920 | * Ask a peer to stop migrating data to us until the given point | 1674 | * Ask a peer to stop migrating data to us until the given point |
1921 | * in time. | 1675 | * in time. |
1922 | * | 1676 | * |
@@ -1927,6 +1681,9 @@ void | |||
1927 | GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, | 1681 | GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, |
1928 | struct GNUNET_TIME_Absolute block_time) | 1682 | struct GNUNET_TIME_Absolute block_time) |
1929 | { | 1683 | { |
1684 | struct GNUNET_MQ_Envelope *env; | ||
1685 | struct MigrationStopMessage *msm; | ||
1686 | |||
1930 | if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) | 1687 | if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) |
1931 | { | 1688 | { |
1932 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1689 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1939,13 +1696,20 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, | |||
1939 | GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time), | 1696 | GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time), |
1940 | GNUNET_YES)); | 1697 | GNUNET_YES)); |
1941 | cp->last_migration_block = block_time; | 1698 | cp->last_migration_block = block_time; |
1942 | if (NULL != cp->migration_pth) | 1699 | env = GNUNET_MQ_msg (msm, |
1943 | GSF_peer_transmit_cancel_ (cp->migration_pth); | 1700 | GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); |
1944 | cp->migration_pth = | 1701 | msm->reserved = htonl (0); |
1945 | GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX, | 1702 | msm->duration |
1946 | GNUNET_TIME_UNIT_FOREVER_REL, | 1703 | = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining |
1947 | sizeof (struct MigrationStopMessage), | 1704 | (cp->last_migration_block)); |
1948 | &create_migration_stop_message, cp); | 1705 | GNUNET_STATISTICS_update (GSF_stats, |
1706 | gettext_noop ("# migration stop messages sent"), | ||
1707 | 1, | ||
1708 | GNUNET_NO); | ||
1709 | GSF_peer_transmit_ (cp, | ||
1710 | GNUNET_SYSERR, | ||
1711 | UINT32_MAX, | ||
1712 | env); | ||
1949 | } | 1713 | } |
1950 | 1714 | ||
1951 | 1715 | ||
@@ -1998,24 +1762,6 @@ GSF_connected_peer_init_ () | |||
1998 | 1762 | ||
1999 | 1763 | ||
2000 | /** | 1764 | /** |
2001 | * Iterator to free peer entries. | ||
2002 | * | ||
2003 | * @param cls closure, unused | ||
2004 | * @param key current key code | ||
2005 | * @param value value in the hash map (peer entry) | ||
2006 | * @return #GNUNET_YES (we should continue to iterate) | ||
2007 | */ | ||
2008 | static int | ||
2009 | clean_peer (void *cls, | ||
2010 | const struct GNUNET_PeerIdentity *key, | ||
2011 | void *value) | ||
2012 | { | ||
2013 | GSF_peer_disconnect_handler_ (NULL, key); | ||
2014 | return GNUNET_YES; | ||
2015 | } | ||
2016 | |||
2017 | |||
2018 | /** | ||
2019 | * Shutdown peer management subsystem. | 1765 | * Shutdown peer management subsystem. |
2020 | */ | 1766 | */ |
2021 | void | 1767 | void |
@@ -2024,9 +1770,6 @@ GSF_connected_peer_done_ () | |||
2024 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, | 1770 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, |
2025 | &flush_respect, | 1771 | &flush_respect, |
2026 | NULL); | 1772 | NULL); |
2027 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, | ||
2028 | &clean_peer, | ||
2029 | NULL); | ||
2030 | GNUNET_SCHEDULER_cancel (fr_task); | 1773 | GNUNET_SCHEDULER_cancel (fr_task); |
2031 | fr_task = NULL; | 1774 | fr_task = NULL; |
2032 | GNUNET_CONTAINER_multipeermap_destroy (cp_map); | 1775 | GNUNET_CONTAINER_multipeermap_destroy (cp_map); |
@@ -2072,7 +1815,8 @@ GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc) | |||
2072 | { | 1815 | { |
2073 | if (NULL == cp_map) | 1816 | if (NULL == cp_map) |
2074 | return; /* already cleaned up */ | 1817 | return; /* already cleaned up */ |
2075 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client, | 1818 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, |
1819 | &clean_local_client, | ||
2076 | (void *) lc); | 1820 | (void *) lc); |
2077 | } | 1821 | } |
2078 | 1822 | ||
diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h index 491ab34e3..3aba5c6a6 100644 --- a/src/fs/gnunet-service-fs_cp.h +++ b/src/fs/gnunet-service-fs_cp.h | |||
@@ -120,11 +120,16 @@ struct GSF_PeerPerformanceData | |||
120 | double avg_priority; | 120 | double avg_priority; |
121 | 121 | ||
122 | /** | 122 | /** |
123 | * The peer's identity. | 123 | * The peer's identity (interned version). |
124 | */ | 124 | */ |
125 | GNUNET_PEER_Id pid; | 125 | GNUNET_PEER_Id pid; |
126 | 126 | ||
127 | /** | 127 | /** |
128 | * The peer's identity (pointer). | ||
129 | */ | ||
130 | const struct GNUNET_PeerIdentity *peer; | ||
131 | |||
132 | /** | ||
128 | * Respect rating for this peer | 133 | * Respect rating for this peer |
129 | */ | 134 | */ |
130 | uint32_t respect; | 135 | uint32_t respect; |
@@ -185,17 +190,6 @@ typedef void | |||
185 | 190 | ||
186 | 191 | ||
187 | /** | 192 | /** |
188 | * Function called after the creation of a connected peer record is complete. | ||
189 | * | ||
190 | * @param cls closure | ||
191 | * @param cp handle to the newly created connected peer record | ||
192 | */ | ||
193 | typedef void | ||
194 | (*GSF_ConnectedPeerCreationCallback) (void *cls, | ||
195 | struct GSF_ConnectedPeer *cp); | ||
196 | |||
197 | |||
198 | /** | ||
199 | * Handle to cancel a transmission request. | 193 | * Handle to cancel a transmission request. |
200 | */ | 194 | */ |
201 | struct GSF_PeerTransmitHandle; | 195 | struct GSF_PeerTransmitHandle; |
@@ -205,14 +199,15 @@ struct GSF_PeerTransmitHandle; | |||
205 | * A peer connected to us. Setup the connected peer | 199 | * A peer connected to us. Setup the connected peer |
206 | * records. | 200 | * records. |
207 | * | 201 | * |
202 | * @param cls NULL | ||
208 | * @param peer identity of peer that connected | 203 | * @param peer identity of peer that connected |
209 | * @param creation_cb callback function when the record is created. | 204 | * @param mq queue for sending messages to @a peer |
210 | * @param creation_cb_cls closure for @creation_cb | 205 | * @return internal handle for the peer |
211 | */ | 206 | */ |
212 | void | 207 | void * |
213 | GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, | 208 | GSF_peer_connect_handler (void *cls, |
214 | GSF_ConnectedPeerCreationCallback creation_cb, | 209 | const struct GNUNET_PeerIdentity *peer, |
215 | void *creation_cb_cls); | 210 | struct GNUNET_MQ_Handle *mq); |
216 | 211 | ||
217 | 212 | ||
218 | /** | 213 | /** |
@@ -242,30 +237,15 @@ GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id, | |||
242 | * the callback is invoked with a 'NULL' buffer. | 237 | * the callback is invoked with a 'NULL' buffer. |
243 | * | 238 | * |
244 | * @param cp target peer | 239 | * @param cp target peer |
245 | * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) | 240 | * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) |
246 | * @param priority how important is this request? | 241 | * @param priority how important is this request? |
247 | * @param timeout when does this request timeout (call gmc with error) | 242 | * @param env envelope of message to send |
248 | * @param size number of bytes we would like to send to the peer | ||
249 | * @param gmc function to call to get the message | ||
250 | * @param gmc_cls closure for gmc | ||
251 | * @return handle to cancel request | ||
252 | */ | 243 | */ |
253 | struct GSF_PeerTransmitHandle * | 244 | void |
254 | GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | 245 | GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, |
255 | int is_query, | 246 | int is_query, |
256 | uint32_t priority, | 247 | uint32_t priority, |
257 | struct GNUNET_TIME_Relative timeout, | 248 | struct GNUNET_MQ_Envelope *env); |
258 | size_t size, GSF_GetMessageCallback gmc, | ||
259 | void *gmc_cls); | ||
260 | |||
261 | |||
262 | /** | ||
263 | * Cancel an earlier request for transmission. | ||
264 | * | ||
265 | * @param pth request to cancel | ||
266 | */ | ||
267 | void | ||
268 | GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth); | ||
269 | 249 | ||
270 | 250 | ||
271 | /** | 251 | /** |
@@ -307,35 +287,25 @@ GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp, | |||
307 | 287 | ||
308 | 288 | ||
309 | /** | 289 | /** |
310 | * Handle P2P "MIGRATION_STOP" message. | 290 | * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message. |
311 | * | 291 | * |
312 | * @param cls closure, always NULL | 292 | * @param cls closure, the `struct GSF_ConnectedPeer` |
313 | * @param other the other peer involved (sender or receiver, NULL | 293 | * @param msm the actual message |
314 | * for loopback messages where we are both sender and receiver) | ||
315 | * @param message the actual message | ||
316 | * @return #GNUNET_OK to keep the connection open, | ||
317 | * #GNUNET_SYSERR to close it (signal serious error) | ||
318 | */ | 294 | */ |
319 | int | 295 | void |
320 | GSF_handle_p2p_migration_stop_ (void *cls, | 296 | handle_p2p_migration_stop (void *cls, |
321 | const struct GNUNET_PeerIdentity *other, | 297 | const struct MigrationStopMessage *message); |
322 | const struct GNUNET_MessageHeader *message); | ||
323 | 298 | ||
324 | 299 | ||
325 | /** | 300 | /** |
326 | * Handle P2P "QUERY" message. Only responsible for creating the | 301 | * Handle P2P "QUERY" message. |
327 | * request entry itself and setting up reply callback and cancellation | ||
328 | * on peer disconnect. Does NOT execute the actual request strategy | ||
329 | * (planning) or local database operations. | ||
330 | * | 302 | * |
331 | * @param other the other peer involved (sender or receiver, NULL | 303 | * @param cls the `struct GSF_ConnectedPeer` of the other sender |
332 | * for loopback messages where we are both sender and receiver) | 304 | * @param gm the actual message |
333 | * @param message the actual message | ||
334 | * @return pending request handle, NULL on error | ||
335 | */ | 305 | */ |
336 | struct GSF_PendingRequest * | 306 | void |
337 | GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | 307 | handle_p2p_get (void *cls, |
338 | const struct GNUNET_MessageHeader *message); | 308 | const struct GetMessage *gm); |
339 | 309 | ||
340 | 310 | ||
341 | /** | 311 | /** |
@@ -366,10 +336,12 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, | |||
366 | * | 336 | * |
367 | * @param cls unused | 337 | * @param cls unused |
368 | * @param peer identity of peer that connected | 338 | * @param peer identity of peer that connected |
339 | * @param internal_cls our `struct GSF_ConnectedPeer` for @a peer | ||
369 | */ | 340 | */ |
370 | void | 341 | void |
371 | GSF_peer_disconnect_handler_ (void *cls, | 342 | GSF_peer_disconnect_handler (void *cls, |
372 | const struct GNUNET_PeerIdentity *peer); | 343 | const struct GNUNET_PeerIdentity *peer, |
344 | void *internal_cls); | ||
373 | 345 | ||
374 | 346 | ||
375 | /** | 347 | /** |
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index b338c1a13..098c3d180 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c | |||
@@ -189,11 +189,6 @@ struct PeerPlan | |||
189 | struct GNUNET_CONTAINER_MultiHashMap *plan_map; | 189 | struct GNUNET_CONTAINER_MultiHashMap *plan_map; |
190 | 190 | ||
191 | /** | 191 | /** |
192 | * Current transmission request handle. | ||
193 | */ | ||
194 | struct GSF_PeerTransmitHandle *pth; | ||
195 | |||
196 | /** | ||
197 | * Peer for which this is the plan. | 192 | * Peer for which this is the plan. |
198 | */ | 193 | */ |
199 | struct GSF_ConnectedPeer *cp; | 194 | struct GSF_ConnectedPeer *cp; |
@@ -202,6 +197,12 @@ struct PeerPlan | |||
202 | * Current task for executing the plan. | 197 | * Current task for executing the plan. |
203 | */ | 198 | */ |
204 | struct GNUNET_SCHEDULER_Task *task; | 199 | struct GNUNET_SCHEDULER_Task *task; |
200 | |||
201 | /** | ||
202 | * Current message under transmission for the plan. | ||
203 | */ | ||
204 | struct GNUNET_MQ_Envelope *env; | ||
205 | |||
205 | }; | 206 | }; |
206 | 207 | ||
207 | 208 | ||
@@ -241,15 +242,6 @@ get_rp_key (struct GSF_RequestPlan *rp) | |||
241 | 242 | ||
242 | 243 | ||
243 | /** | 244 | /** |
244 | * Figure out when and how to transmit to the given peer. | ||
245 | * | ||
246 | * @param cls the `struct GSF_ConnectedPeer` for transmission | ||
247 | */ | ||
248 | static void | ||
249 | schedule_peer_transmission (void *cls); | ||
250 | |||
251 | |||
252 | /** | ||
253 | * Insert the given request plan into the heap with the appropriate weight. | 245 | * Insert the given request plan into the heap with the appropriate weight. |
254 | * | 246 | * |
255 | * @param pp associated peer's plan | 247 | * @param pp associated peer's plan |
@@ -329,21 +321,22 @@ plan (struct PeerPlan *pp, | |||
329 | rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); | 321 | rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); |
330 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 322 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
331 | "Earliest (re)transmission for `%s' in %us\n", | 323 | "Earliest (re)transmission for `%s' in %us\n", |
332 | GNUNET_h2s (&prd->query), rp->transmission_counter); | 324 | GNUNET_h2s (&prd->query), |
325 | rp->transmission_counter); | ||
333 | GNUNET_assert (rp->hn == NULL); | 326 | GNUNET_assert (rp->hn == NULL); |
334 | if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) | 327 | if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) |
335 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority); | 328 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, |
329 | rp, | ||
330 | rp->priority); | ||
336 | else | 331 | else |
337 | rp->hn = | 332 | rp->hn = |
338 | GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp, | 333 | GNUNET_CONTAINER_heap_insert (pp->delay_heap, |
334 | rp, | ||
339 | rp->earliest_transmission.abs_value_us); | 335 | rp->earliest_transmission.abs_value_us); |
340 | GNUNET_assert (GNUNET_YES == | 336 | GNUNET_assert (GNUNET_YES == |
341 | GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, | 337 | GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, |
342 | get_rp_key (rp), | 338 | get_rp_key (rp), |
343 | rp)); | 339 | rp)); |
344 | if (NULL != pp->task) | ||
345 | GNUNET_SCHEDULER_cancel (pp->task); | ||
346 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
347 | #undef N | 340 | #undef N |
348 | } | 341 | } |
349 | 342 | ||
@@ -383,75 +376,6 @@ get_latest (const struct GSF_RequestPlan *rp) | |||
383 | 376 | ||
384 | 377 | ||
385 | /** | 378 | /** |
386 | * Function called to get a message for transmission. | ||
387 | * | ||
388 | * @param cls closure | ||
389 | * @param buf_size number of bytes available in @a buf | ||
390 | * @param buf where to copy the message, NULL on error (peer disconnect) | ||
391 | * @return number of bytes copied to @a buf, can be 0 (without indicating an error) | ||
392 | */ | ||
393 | static size_t | ||
394 | transmit_message_callback (void *cls, | ||
395 | size_t buf_size, | ||
396 | void *buf) | ||
397 | { | ||
398 | struct PeerPlan *pp = cls; | ||
399 | struct GSF_RequestPlan *rp; | ||
400 | size_t msize; | ||
401 | |||
402 | pp->pth = NULL; | ||
403 | if (NULL == buf) | ||
404 | { | ||
405 | /* failed, try again... */ | ||
406 | if (NULL != pp->task) | ||
407 | GNUNET_SCHEDULER_cancel (pp->task); | ||
408 | |||
409 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
410 | GNUNET_STATISTICS_update (GSF_stats, | ||
411 | gettext_noop | ||
412 | ("# transmission failed (core has no bandwidth)"), | ||
413 | 1, GNUNET_NO); | ||
414 | return 0; | ||
415 | } | ||
416 | rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); | ||
417 | if (NULL == rp) | ||
418 | { | ||
419 | if (NULL != pp->task) | ||
420 | GNUNET_SCHEDULER_cancel (pp->task); | ||
421 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
422 | return 0; | ||
423 | } | ||
424 | msize = GSF_pending_request_get_message_ (get_latest (rp), | ||
425 | buf_size, | ||
426 | buf); | ||
427 | if (msize > buf_size) | ||
428 | { | ||
429 | if (NULL != pp->task) | ||
430 | GNUNET_SCHEDULER_cancel (pp->task); | ||
431 | /* buffer to small (message changed), try again */ | ||
432 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
433 | return 0; | ||
434 | } | ||
435 | /* remove from root, add again elsewhere... */ | ||
436 | GNUNET_assert (rp == | ||
437 | GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); | ||
438 | rp->hn = NULL; | ||
439 | rp->last_transmission = GNUNET_TIME_absolute_get (); | ||
440 | rp->transmission_counter++; | ||
441 | total_delay++; | ||
442 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
443 | "Executing plan %p executed %u times, planning retransmission\n", | ||
444 | rp, rp->transmission_counter); | ||
445 | plan (pp, rp); | ||
446 | GNUNET_STATISTICS_update (GSF_stats, | ||
447 | gettext_noop ("# query messages sent to other peers"), | ||
448 | 1, | ||
449 | GNUNET_NO); | ||
450 | return msize; | ||
451 | } | ||
452 | |||
453 | |||
454 | /** | ||
455 | * Figure out when and how to transmit to the given peer. | 379 | * Figure out when and how to transmit to the given peer. |
456 | * | 380 | * |
457 | * @param cls the `struct PeerPlan` | 381 | * @param cls the `struct PeerPlan` |
@@ -461,14 +385,16 @@ schedule_peer_transmission (void *cls) | |||
461 | { | 385 | { |
462 | struct PeerPlan *pp = cls; | 386 | struct PeerPlan *pp = cls; |
463 | struct GSF_RequestPlan *rp; | 387 | struct GSF_RequestPlan *rp; |
464 | size_t msize; | ||
465 | struct GNUNET_TIME_Relative delay; | 388 | struct GNUNET_TIME_Relative delay; |
466 | 389 | ||
467 | pp->task = NULL; | 390 | if (NULL != pp->task) |
468 | if (NULL != pp->pth) | 391 | { |
392 | pp->task = NULL; | ||
393 | } | ||
394 | else | ||
469 | { | 395 | { |
470 | GSF_peer_transmit_cancel_ (pp->pth); | 396 | GNUNET_assert (NULL != pp->env); |
471 | pp->pth = NULL; | 397 | pp->env = NULL; |
472 | } | 398 | } |
473 | /* move ready requests to priority queue */ | 399 | /* move ready requests to priority queue */ |
474 | while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && | 400 | while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && |
@@ -508,23 +434,40 @@ schedule_peer_transmission (void *cls) | |||
508 | return; | 434 | return; |
509 | } | 435 | } |
510 | #if INSANE_STATISTICS | 436 | #if INSANE_STATISTICS |
511 | GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"), | 437 | GNUNET_STATISTICS_update (GSF_stats, |
512 | 1, GNUNET_NO); | 438 | gettext_noop ("# query plans executed"), |
439 | 1, | ||
440 | GNUNET_NO); | ||
513 | #endif | 441 | #endif |
514 | /* process from priority heap */ | 442 | /* process from priority heap */ |
515 | rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); | 443 | rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap); |
516 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 444 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
517 | "Executing query plan %p\n", | 445 | "Executing query plan %p\n", |
518 | rp); | 446 | rp); |
519 | GNUNET_assert (NULL != rp); | 447 | GNUNET_assert (NULL != rp); |
520 | msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); | 448 | rp->hn = NULL; |
521 | pp->pth = | 449 | rp->last_transmission = GNUNET_TIME_absolute_get (); |
522 | GSF_peer_transmit_ (pp->cp, GNUNET_YES, | 450 | rp->transmission_counter++; |
523 | rp->priority, | 451 | total_delay++; |
524 | GNUNET_TIME_UNIT_FOREVER_REL, | 452 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
525 | msize, | 453 | "Executing plan %p executed %u times, planning retransmission\n", |
526 | &transmit_message_callback, pp); | 454 | rp, |
527 | GNUNET_assert (NULL != pp->pth); | 455 | rp->transmission_counter); |
456 | GNUNET_assert (NULL == pp->env); | ||
457 | pp->env = GSF_pending_request_get_message_ (get_latest (rp)); | ||
458 | GNUNET_MQ_notify_sent (pp->env, | ||
459 | &schedule_peer_transmission, | ||
460 | pp); | ||
461 | GSF_peer_transmit_ (pp->cp, | ||
462 | GNUNET_YES, | ||
463 | rp->priority, | ||
464 | pp->env); | ||
465 | GNUNET_STATISTICS_update (GSF_stats, | ||
466 | gettext_noop ("# query messages sent to other peers"), | ||
467 | 1, | ||
468 | GNUNET_NO); | ||
469 | plan (pp, | ||
470 | rp); | ||
528 | } | 471 | } |
529 | 472 | ||
530 | 473 | ||
@@ -646,6 +589,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, | |||
646 | id, | 589 | id, |
647 | pp, | 590 | pp, |
648 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 591 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
592 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, | ||
593 | pp); | ||
649 | } | 594 | } |
650 | mpc.merged = GNUNET_NO; | 595 | mpc.merged = GNUNET_NO; |
651 | mpc.pr = pr; | 596 | mpc.pr = pr; |
@@ -710,16 +655,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) | |||
710 | GNUNET_assert (GNUNET_YES == | 655 | GNUNET_assert (GNUNET_YES == |
711 | GNUNET_CONTAINER_multipeermap_remove (plans, id, | 656 | GNUNET_CONTAINER_multipeermap_remove (plans, id, |
712 | pp)); | 657 | pp)); |
713 | if (NULL != pp->pth) | ||
714 | { | ||
715 | GSF_peer_transmit_cancel_ (pp->pth); | ||
716 | pp->pth = NULL; | ||
717 | } | ||
718 | if (NULL != pp->task) | 658 | if (NULL != pp->task) |
719 | { | 659 | { |
720 | GNUNET_SCHEDULER_cancel (pp->task); | 660 | GNUNET_SCHEDULER_cancel (pp->task); |
721 | pp->task = NULL; | 661 | pp->task = NULL; |
722 | } | 662 | } |
663 | if (NULL != pp->env) | ||
664 | { | ||
665 | GNUNET_MQ_send_cancel (pp->env); | ||
666 | pp->env = NULL; | ||
667 | } | ||
723 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) | 668 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) |
724 | { | 669 | { |
725 | GNUNET_break (GNUNET_YES == | 670 | GNUNET_break (GNUNET_YES == |
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index cd58992c1..f8a7b61f0 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -512,21 +512,17 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, | |||
512 | 512 | ||
513 | /** | 513 | /** |
514 | * Generate the message corresponding to the given pending request for | 514 | * Generate the message corresponding to the given pending request for |
515 | * transmission to other peers (or at least determine its size). | 515 | * transmission to other peers. |
516 | * | 516 | * |
517 | * @param pr request to generate the message for | 517 | * @param pr request to generate the message for |
518 | * @param buf_size number of bytes available in @a buf | 518 | * @return envelope with the request message |
519 | * @param buf where to copy the message (can be NULL) | ||
520 | * @return number of bytes needed (if `>` @a buf_size) or used | ||
521 | */ | 519 | */ |
522 | size_t | 520 | struct GNUNET_MQ_Envelope * |
523 | GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | 521 | GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr) |
524 | size_t buf_size, void *buf) | ||
525 | { | 522 | { |
526 | char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; | 523 | struct GNUNET_MQ_Envelope *env; |
527 | struct GetMessage *gm; | 524 | struct GetMessage *gm; |
528 | struct GNUNET_PeerIdentity *ext; | 525 | struct GNUNET_PeerIdentity *ext; |
529 | size_t msize; | ||
530 | unsigned int k; | 526 | unsigned int k; |
531 | uint32_t bm; | 527 | uint32_t bm; |
532 | uint32_t prio; | 528 | uint32_t prio; |
@@ -535,11 +531,10 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
535 | int64_t ttl; | 531 | int64_t ttl; |
536 | int do_route; | 532 | int do_route; |
537 | 533 | ||
538 | if (buf_size > 0) | 534 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
539 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 535 | "Building request message for `%s' of type %d\n", |
540 | "Building request message for `%s' of type %d\n", | 536 | GNUNET_h2s (&pr->public_data.query), |
541 | GNUNET_h2s (&pr->public_data.query), | 537 | pr->public_data.type); |
542 | pr->public_data.type); | ||
543 | k = 0; | 538 | k = 0; |
544 | bm = 0; | 539 | bm = 0; |
545 | do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); | 540 | do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); |
@@ -559,13 +554,9 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
559 | k++; | 554 | k++; |
560 | } | 555 | } |
561 | bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); | 556 | bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); |
562 | msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct GNUNET_PeerIdentity); | 557 | env = GNUNET_MQ_msg_extra (gm, |
563 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | 558 | bf_size + k * sizeof (struct GNUNET_PeerIdentity), |
564 | if (buf_size < msize) | 559 | GNUNET_MESSAGE_TYPE_FS_GET); |
565 | return msize; | ||
566 | gm = (struct GetMessage *) lbuf; | ||
567 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); | ||
568 | gm->header.size = htons (msize); | ||
569 | gm->type = htonl (pr->public_data.type); | 560 | gm->type = htonl (pr->public_data.type); |
570 | if (do_route) | 561 | if (do_route) |
571 | prio = | 562 | prio = |
@@ -585,7 +576,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
585 | gm->query = pr->public_data.query; | 576 | gm->query = pr->public_data.query; |
586 | ext = (struct GNUNET_PeerIdentity *) &gm[1]; | 577 | ext = (struct GNUNET_PeerIdentity *) &gm[1]; |
587 | k = 0; | 578 | k = 0; |
588 | if (!do_route) | 579 | if (! do_route) |
589 | GNUNET_PEER_resolve (pr->sender_pid, | 580 | GNUNET_PEER_resolve (pr->sender_pid, |
590 | &ext[k++]); | 581 | &ext[k++]); |
591 | if (NULL != pr->public_data.target) | 582 | if (NULL != pr->public_data.target) |
@@ -595,8 +586,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
595 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, | 586 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, |
596 | (char *) &ext[k], | 587 | (char *) &ext[k], |
597 | bf_size)); | 588 | bf_size)); |
598 | GNUNET_memcpy (buf, gm, msize); | 589 | return env; |
599 | return msize; | ||
600 | } | 590 | } |
601 | 591 | ||
602 | 592 | ||
@@ -1699,18 +1689,14 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, | |||
1699 | * this content and possibly passes it on (to local clients or other | 1689 | * this content and possibly passes it on (to local clients or other |
1700 | * peers). Does NOT perform migration (content caching at this peer). | 1690 | * peers). Does NOT perform migration (content caching at this peer). |
1701 | * | 1691 | * |
1702 | * @param cp the other peer involved (sender or receiver, NULL | 1692 | * @param cls the other peer involved |
1703 | * for loopback messages where we are both sender and receiver) | 1693 | * @param put the actual message |
1704 | * @param message the actual message | ||
1705 | * @return #GNUNET_OK if the message was well-formed, | ||
1706 | * #GNUNET_SYSERR if the message was malformed (close connection, | ||
1707 | * do not cache under any circumstances) | ||
1708 | */ | 1694 | */ |
1709 | int | 1695 | void |
1710 | GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | 1696 | handle_p2p_put (void *cls, |
1711 | const struct GNUNET_MessageHeader *message) | 1697 | const struct PutMessage *put) |
1712 | { | 1698 | { |
1713 | const struct PutMessage *put; | 1699 | struct GSF_ConnectedPeer *cp = cls; |
1714 | uint16_t msize; | 1700 | uint16_t msize; |
1715 | size_t dsize; | 1701 | size_t dsize; |
1716 | enum GNUNET_BLOCK_Type type; | 1702 | enum GNUNET_BLOCK_Type type; |
@@ -1721,21 +1707,17 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1721 | double putl; | 1707 | double putl; |
1722 | struct PutMigrationContext *pmc; | 1708 | struct PutMigrationContext *pmc; |
1723 | 1709 | ||
1724 | msize = ntohs (message->size); | 1710 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1725 | if (msize < sizeof (struct PutMessage)) | 1711 | "Received P2P PUT from %s\n", |
1726 | { | 1712 | GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer)); |
1727 | GNUNET_break_op (0); | 1713 | GSF_cover_content_count++; |
1728 | return GNUNET_SYSERR; | 1714 | msize = ntohs (put->header.size); |
1729 | } | ||
1730 | put = (const struct PutMessage *) message; | ||
1731 | dsize = msize - sizeof (struct PutMessage); | 1715 | dsize = msize - sizeof (struct PutMessage); |
1732 | type = ntohl (put->type); | 1716 | type = ntohl (put->type); |
1733 | expiration = GNUNET_TIME_absolute_ntoh (put->expiration); | 1717 | expiration = GNUNET_TIME_absolute_ntoh (put->expiration); |
1734 | /* do not allow migrated content to live longer than 1 year */ | 1718 | /* do not allow migrated content to live longer than 1 year */ |
1735 | expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), | 1719 | expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), |
1736 | expiration); | 1720 | expiration); |
1737 | if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) | ||
1738 | return GNUNET_SYSERR; | ||
1739 | if (GNUNET_OK != | 1721 | if (GNUNET_OK != |
1740 | GNUNET_BLOCK_get_key (GSF_block_ctx, | 1722 | GNUNET_BLOCK_get_key (GSF_block_ctx, |
1741 | type, | 1723 | type, |
@@ -1744,7 +1726,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1744 | &query)) | 1726 | &query)) |
1745 | { | 1727 | { |
1746 | GNUNET_break_op (0); | 1728 | GNUNET_break_op (0); |
1747 | return GNUNET_SYSERR; | 1729 | return; |
1748 | } | 1730 | } |
1749 | GNUNET_STATISTICS_update (GSF_stats, | 1731 | GNUNET_STATISTICS_update (GSF_stats, |
1750 | gettext_noop ("# GAP PUT messages received"), | 1732 | gettext_noop ("# GAP PUT messages received"), |
@@ -1786,11 +1768,19 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1786 | GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, | 1768 | GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, |
1787 | &pmc->origin); | 1769 | &pmc->origin); |
1788 | if (NULL == | 1770 | if (NULL == |
1789 | GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type, | 1771 | GNUNET_DATASTORE_put (GSF_dsh, |
1790 | prq.priority, 1 /* anonymity */ , | 1772 | 0, |
1773 | &query, | ||
1774 | dsize, | ||
1775 | &put[1], | ||
1776 | type, | ||
1777 | prq.priority, | ||
1778 | 1 /* anonymity */ , | ||
1791 | 0 /* replication */ , | 1779 | 0 /* replication */ , |
1792 | expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE, | 1780 | expiration, 1 + prq.priority, |
1793 | &put_migration_continuation, pmc)) | 1781 | MAX_DATASTORE_QUEUE, |
1782 | &put_migration_continuation, | ||
1783 | pmc)) | ||
1794 | { | 1784 | { |
1795 | put_migration_continuation (pmc, | 1785 | put_migration_continuation (pmc, |
1796 | GNUNET_SYSERR, | 1786 | GNUNET_SYSERR, |
@@ -1802,7 +1792,8 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1802 | { | 1792 | { |
1803 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1793 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1804 | "Choosing not to keep content `%s' (%d/%d)\n", | 1794 | "Choosing not to keep content `%s' (%d/%d)\n", |
1805 | GNUNET_h2s (&query), active_to_migration, | 1795 | GNUNET_h2s (&query), |
1796 | active_to_migration, | ||
1806 | test_put_load_too_high (prq.priority)); | 1797 | test_put_load_too_high (prq.priority)); |
1807 | } | 1798 | } |
1808 | putl = GNUNET_LOAD_get_load (datastore_put_load); | 1799 | putl = GNUNET_LOAD_get_load (datastore_put_load); |
@@ -1826,9 +1817,9 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1826 | putl, | 1817 | putl, |
1827 | active_to_migration, | 1818 | active_to_migration, |
1828 | (GNUNET_NO == prq.request_found)); | 1819 | (GNUNET_NO == prq.request_found)); |
1829 | GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time)); | 1820 | GSF_block_peer_migration_ (cp, |
1821 | GNUNET_TIME_relative_to_absolute (block_time)); | ||
1830 | } | 1822 | } |
1831 | return GNUNET_OK; | ||
1832 | } | 1823 | } |
1833 | 1824 | ||
1834 | 1825 | ||
diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 2765f9b3d..fe4297414 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h | |||
@@ -40,34 +40,34 @@ enum GSF_PendingRequestOptions | |||
40 | */ | 40 | */ |
41 | GSF_PRO_DEFAULTS = 0, | 41 | GSF_PRO_DEFAULTS = 0, |
42 | 42 | ||
43 | /** | 43 | /** |
44 | * Request must only be processed locally. | 44 | * Request must only be processed locally. |
45 | */ | 45 | */ |
46 | GSF_PRO_LOCAL_ONLY = 1, | 46 | GSF_PRO_LOCAL_ONLY = 1, |
47 | 47 | ||
48 | /** | 48 | /** |
49 | * Request must only be forwarded (no routing) | 49 | * Request must only be forwarded (no routing) |
50 | */ | 50 | */ |
51 | GSF_PRO_FORWARD_ONLY = 2, | 51 | GSF_PRO_FORWARD_ONLY = 2, |
52 | 52 | ||
53 | /** | 53 | /** |
54 | * Request persists indefinitely (no expiration). | 54 | * Request persists indefinitely (no expiration). |
55 | */ | 55 | */ |
56 | GSF_PRO_REQUEST_NEVER_EXPIRES = 4, | 56 | GSF_PRO_REQUEST_NEVER_EXPIRES = 4, |
57 | 57 | ||
58 | /** | 58 | /** |
59 | * Request is allowed to refresh bloomfilter and change mingle value. | 59 | * Request is allowed to refresh bloomfilter and change mingle value. |
60 | */ | 60 | */ |
61 | GSF_PRO_BLOOMFILTER_FULL_REFRESH = 8, | 61 | GSF_PRO_BLOOMFILTER_FULL_REFRESH = 8, |
62 | 62 | ||
63 | /** | 63 | /** |
64 | * Request priority is allowed to be exceeded. | 64 | * Request priority is allowed to be exceeded. |
65 | */ | 65 | */ |
66 | GSF_PRO_PRIORITY_UNLIMITED = 16, | 66 | GSF_PRO_PRIORITY_UNLIMITED = 16, |
67 | 67 | ||
68 | /** | 68 | /** |
69 | * Option mask for typical local requests. | 69 | * Option mask for typical local requests. |
70 | */ | 70 | */ |
71 | GSF_PRO_LOCAL_REQUEST = | 71 | GSF_PRO_LOCAL_REQUEST = |
72 | (GSF_PRO_BLOOMFILTER_FULL_REFRESH | GSF_PRO_PRIORITY_UNLIMITED | GSF_PRO_REQUEST_NEVER_EXPIRES) | 72 | (GSF_PRO_BLOOMFILTER_FULL_REFRESH | GSF_PRO_PRIORITY_UNLIMITED | GSF_PRO_REQUEST_NEVER_EXPIRES) |
73 | }; | 73 | }; |
@@ -288,17 +288,13 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, | |||
288 | 288 | ||
289 | /** | 289 | /** |
290 | * Generate the message corresponding to the given pending request for | 290 | * Generate the message corresponding to the given pending request for |
291 | * transmission to other peers (or at least determine its size). | 291 | * transmission to other peers. |
292 | * | 292 | * |
293 | * @param pr request to generate the message for | 293 | * @param pr request to generate the message for |
294 | * @param buf_size number of bytes available in @a buf | 294 | * @return envelope with the request message |
295 | * @param buf where to copy the message (can be NULL) | ||
296 | * @return number of bytes needed (if @a buf_size too small) or used | ||
297 | */ | 295 | */ |
298 | size_t | 296 | struct GNUNET_MQ_Envelope * |
299 | GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | 297 | GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr); |
300 | size_t buf_size, | ||
301 | void *buf); | ||
302 | 298 | ||
303 | 299 | ||
304 | /** | 300 | /** |
@@ -344,16 +340,12 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, | |||
344 | * this content and possibly passes it on (to local clients or other | 340 | * this content and possibly passes it on (to local clients or other |
345 | * peers). Does NOT perform migration (content caching at this peer). | 341 | * peers). Does NOT perform migration (content caching at this peer). |
346 | * | 342 | * |
347 | * @param cp the other peer involved (sender or receiver, NULL | 343 | * @param cls the other peer involved (sender) |
348 | * for loopback messages where we are both sender and receiver) | 344 | * @param put the actual message |
349 | * @param message the actual message | ||
350 | * @return #GNUNET_OK if the message was well-formed, | ||
351 | * #GNUNET_SYSERR if the message was malformed (close connection, | ||
352 | * do not cache under any circumstances) | ||
353 | */ | 345 | */ |
354 | int | 346 | void |
355 | GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | 347 | handle_p2p_put (void *cls, |
356 | const struct GNUNET_MessageHeader *message); | 348 | const struct PutMessage *put); |
357 | 349 | ||
358 | 350 | ||
359 | /** | 351 | /** |
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index 59f3772f5..1573bc160 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2011 GNUnet e.V. | 3 | Copyright (C) 2011, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -102,8 +102,7 @@ struct MigrationReadyBlock | |||
102 | 102 | ||
103 | 103 | ||
104 | /** | 104 | /** |
105 | * Information about a peer waiting for | 105 | * Information about a peer waiting for migratable data. |
106 | * migratable data. | ||
107 | */ | 106 | */ |
108 | struct MigrationReadyPeer | 107 | struct MigrationReadyPeer |
109 | { | 108 | { |
@@ -123,15 +122,9 @@ struct MigrationReadyPeer | |||
123 | struct GSF_ConnectedPeer *peer; | 122 | struct GSF_ConnectedPeer *peer; |
124 | 123 | ||
125 | /** | 124 | /** |
126 | * Handle for current transmission request, | 125 | * Envelope of the currently pushed message. |
127 | * or NULL for none. | ||
128 | */ | 126 | */ |
129 | struct GSF_PeerTransmitHandle *th; | 127 | struct GNUNET_MQ_Envelope *env; |
130 | |||
131 | /** | ||
132 | * Message we are trying to push right now (or NULL) | ||
133 | */ | ||
134 | struct PutMessage *msg; | ||
135 | }; | 128 | }; |
136 | 129 | ||
137 | 130 | ||
@@ -163,7 +156,7 @@ static struct GNUNET_DATASTORE_QueueEntry *mig_qe; | |||
163 | /** | 156 | /** |
164 | * ID of task that collects blocks for migration. | 157 | * ID of task that collects blocks for migration. |
165 | */ | 158 | */ |
166 | static struct GNUNET_SCHEDULER_Task * mig_task; | 159 | static struct GNUNET_SCHEDULER_Task *mig_task; |
167 | 160 | ||
168 | /** | 161 | /** |
169 | * What is the maximum frequency at which we are allowed to | 162 | * What is the maximum frequency at which we are allowed to |
@@ -195,8 +188,11 @@ static int value_found; | |||
195 | static void | 188 | static void |
196 | delete_migration_block (struct MigrationReadyBlock *mb) | 189 | delete_migration_block (struct MigrationReadyBlock *mb) |
197 | { | 190 | { |
198 | GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); | 191 | GNUNET_CONTAINER_DLL_remove (mig_head, |
199 | GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); | 192 | mig_tail, |
193 | mb); | ||
194 | GNUNET_PEER_decrement_rcs (mb->target_list, | ||
195 | MIGRATION_LIST_SIZE); | ||
200 | mig_size--; | 196 | mig_size--; |
201 | GNUNET_free (mb); | 197 | GNUNET_free (mb); |
202 | } | 198 | } |
@@ -204,49 +200,11 @@ delete_migration_block (struct MigrationReadyBlock *mb) | |||
204 | 200 | ||
205 | /** | 201 | /** |
206 | * Find content for migration to this peer. | 202 | * Find content for migration to this peer. |
207 | */ | ||
208 | static void | ||
209 | find_content (struct MigrationReadyPeer *mrp); | ||
210 | |||
211 | |||
212 | /** | ||
213 | * Transmit the message currently scheduled for transmission. | ||
214 | * | 203 | * |
215 | * @param cls the `struct MigrationReadyPeer` | 204 | * @param cls a `struct MigrationReadyPeer *` |
216 | * @param buf_size number of bytes available in @a buf | ||
217 | * @param buf where to copy the message, NULL on error (peer disconnect) | ||
218 | * @return number of bytes copied to @a buf, can be 0 (without indicating an error) | ||
219 | */ | 205 | */ |
220 | static size_t | 206 | static void |
221 | transmit_message (void *cls, | 207 | find_content (void *cls); |
222 | size_t buf_size, | ||
223 | void *buf) | ||
224 | { | ||
225 | struct MigrationReadyPeer *peer = cls; | ||
226 | struct PutMessage *msg; | ||
227 | uint16_t msize; | ||
228 | |||
229 | peer->th = NULL; | ||
230 | msg = peer->msg; | ||
231 | peer->msg = NULL; | ||
232 | if (NULL == buf) | ||
233 | { | ||
234 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
235 | "Failed to migrate content to another peer (disconnect)\n"); | ||
236 | GNUNET_free (msg); | ||
237 | return 0; | ||
238 | } | ||
239 | msize = ntohs (msg->header.size); | ||
240 | GNUNET_assert (msize <= buf_size); | ||
241 | GNUNET_memcpy (buf, msg, msize); | ||
242 | GNUNET_free (msg); | ||
243 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
244 | "Pushing %u bytes to %s\n", | ||
245 | msize, | ||
246 | GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); | ||
247 | find_content (peer); | ||
248 | return msize; | ||
249 | } | ||
250 | 208 | ||
251 | 209 | ||
252 | /** | 210 | /** |
@@ -257,31 +215,30 @@ transmit_message (void *cls, | |||
257 | * @return #GNUNET_YES if the block was deleted (!) | 215 | * @return #GNUNET_YES if the block was deleted (!) |
258 | */ | 216 | */ |
259 | static int | 217 | static int |
260 | transmit_content (struct MigrationReadyPeer *peer, | 218 | transmit_content (struct MigrationReadyPeer *mrp, |
261 | struct MigrationReadyBlock *block) | 219 | struct MigrationReadyBlock *block) |
262 | { | 220 | { |
263 | size_t msize; | ||
264 | struct PutMessage *msg; | 221 | struct PutMessage *msg; |
265 | unsigned int i; | 222 | unsigned int i; |
266 | struct GSF_PeerPerformanceData *ppd; | 223 | struct GSF_PeerPerformanceData *ppd; |
267 | int ret; | 224 | int ret; |
268 | 225 | ||
269 | ppd = GSF_get_peer_performance_data_ (peer->peer); | 226 | ppd = GSF_get_peer_performance_data_ (mrp->peer); |
270 | GNUNET_assert (NULL == peer->th); | 227 | mrp->env = GNUNET_MQ_msg_extra (msg, |
271 | msize = sizeof (struct PutMessage) + block->size; | 228 | block->size, |
272 | msg = GNUNET_malloc (msize); | 229 | GNUNET_MESSAGE_TYPE_FS_PUT); |
273 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | ||
274 | msg->header.size = htons (msize); | ||
275 | msg->type = htonl (block->type); | 230 | msg->type = htonl (block->type); |
276 | msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); | 231 | msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); |
277 | GNUNET_memcpy (&msg[1], &block[1], block->size); | 232 | GNUNET_memcpy (&msg[1], |
278 | peer->msg = msg; | 233 | &block[1], |
234 | block->size); | ||
279 | for (i = 0; i < MIGRATION_LIST_SIZE; i++) | 235 | for (i = 0; i < MIGRATION_LIST_SIZE; i++) |
280 | { | 236 | { |
281 | if (block->target_list[i] == 0) | 237 | if (block->target_list[i] == 0) |
282 | { | 238 | { |
283 | block->target_list[i] = ppd->pid; | 239 | block->target_list[i] = ppd->pid; |
284 | GNUNET_PEER_change_rc (block->target_list[i], 1); | 240 | GNUNET_PEER_change_rc (block->target_list[i], |
241 | 1); | ||
285 | break; | 242 | break; |
286 | } | 243 | } |
287 | } | 244 | } |
@@ -294,15 +251,13 @@ transmit_content (struct MigrationReadyPeer *peer, | |||
294 | { | 251 | { |
295 | ret = GNUNET_NO; | 252 | ret = GNUNET_NO; |
296 | } | 253 | } |
297 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 254 | GNUNET_MQ_notify_sent (mrp->env, |
298 | "Asking for transmission of %u bytes to %s for migration\n", | 255 | &find_content, |
299 | (unsigned int) msize, | 256 | mrp); |
300 | GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); | 257 | GSF_peer_transmit_ (mrp->peer, |
301 | peer->th = GSF_peer_transmit_ (peer->peer, | 258 | GNUNET_NO, |
302 | GNUNET_NO, 0 /* priority */ , | 259 | 0 /* priority */ , |
303 | GNUNET_TIME_UNIT_FOREVER_REL, | 260 | mrp->env); |
304 | msize, | ||
305 | &transmit_message, peer); | ||
306 | return ret; | 261 | return ret; |
307 | } | 262 | } |
308 | 263 | ||
@@ -330,12 +285,12 @@ count_targets (struct MigrationReadyBlock *block) | |||
330 | * Check if sending this block to this peer would | 285 | * Check if sending this block to this peer would |
331 | * be a good idea. | 286 | * be a good idea. |
332 | * | 287 | * |
333 | * @param peer target peer | 288 | * @param mrp target peer |
334 | * @param block the block | 289 | * @param block the block |
335 | * @return score (>= 0: feasible, negative: infeasible) | 290 | * @return score (>= 0: feasible, negative: infeasible) |
336 | */ | 291 | */ |
337 | static long | 292 | static long |
338 | score_content (struct MigrationReadyPeer *peer, | 293 | score_content (struct MigrationReadyPeer *mrp, |
339 | struct MigrationReadyBlock *block) | 294 | struct MigrationReadyBlock *block) |
340 | { | 295 | { |
341 | unsigned int i; | 296 | unsigned int i; |
@@ -344,14 +299,18 @@ score_content (struct MigrationReadyPeer *peer, | |||
344 | struct GNUNET_HashCode hc; | 299 | struct GNUNET_HashCode hc; |
345 | uint32_t dist; | 300 | uint32_t dist; |
346 | 301 | ||
347 | ppd = GSF_get_peer_performance_data_ (peer->peer); | 302 | ppd = GSF_get_peer_performance_data_ (mrp->peer); |
348 | for (i = 0; i < MIGRATION_LIST_SIZE; i++) | 303 | for (i = 0; i < MIGRATION_LIST_SIZE; i++) |
349 | if (block->target_list[i] == ppd->pid) | 304 | if (block->target_list[i] == ppd->pid) |
350 | return -1; | 305 | return -1; |
351 | GNUNET_assert (0 != ppd->pid); | 306 | GNUNET_assert (0 != ppd->pid); |
352 | GNUNET_PEER_resolve (ppd->pid, &id); | 307 | GNUNET_PEER_resolve (ppd->pid, |
353 | GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc); | 308 | &id); |
354 | dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc); | 309 | GNUNET_CRYPTO_hash (&id, |
310 | sizeof (struct GNUNET_PeerIdentity), | ||
311 | &hc); | ||
312 | dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, | ||
313 | &hc); | ||
355 | /* closer distance, higher score: */ | 314 | /* closer distance, higher score: */ |
356 | return UINT32_MAX - dist; | 315 | return UINT32_MAX - dist; |
357 | } | 316 | } |
@@ -368,17 +327,18 @@ consider_gathering (void); | |||
368 | /** | 327 | /** |
369 | * Find content for migration to this peer. | 328 | * Find content for migration to this peer. |
370 | * | 329 | * |
371 | * @param mrp peer to find content for | 330 | * @param cls peer to find content for |
372 | */ | 331 | */ |
373 | static void | 332 | static void |
374 | find_content (struct MigrationReadyPeer *mrp) | 333 | find_content (void *cls) |
375 | { | 334 | { |
335 | struct MigrationReadyPeer *mrp = cls; | ||
376 | struct MigrationReadyBlock *pos; | 336 | struct MigrationReadyBlock *pos; |
377 | long score; | 337 | long score; |
378 | long best_score; | 338 | long best_score; |
379 | struct MigrationReadyBlock *best; | 339 | struct MigrationReadyBlock *best; |
380 | 340 | ||
381 | GNUNET_assert (NULL == mrp->th); | 341 | mrp->env = NULL; |
382 | best = NULL; | 342 | best = NULL; |
383 | best_score = -1; | 343 | best_score = -1; |
384 | pos = mig_head; | 344 | pos = mig_head; |
@@ -423,7 +383,8 @@ find_content (struct MigrationReadyPeer *mrp) | |||
423 | } | 383 | } |
424 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 384 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
425 | "Preparing to push best content to peer\n"); | 385 | "Preparing to push best content to peer\n"); |
426 | transmit_content (mrp, best); | 386 | transmit_content (mrp, |
387 | best); | ||
427 | } | 388 | } |
428 | 389 | ||
429 | 390 | ||
@@ -454,9 +415,12 @@ consider_gathering () | |||
454 | return; | 415 | return; |
455 | if (mig_size >= MAX_MIGRATION_QUEUE) | 416 | if (mig_size >= MAX_MIGRATION_QUEUE) |
456 | return; | 417 | return; |
457 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); | 418 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
458 | delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); | 419 | mig_size); |
459 | delay = GNUNET_TIME_relative_max (delay, min_migration_delay); | 420 | delay = GNUNET_TIME_relative_divide (delay, |
421 | MAX_MIGRATION_QUEUE); | ||
422 | delay = GNUNET_TIME_relative_max (delay, | ||
423 | min_migration_delay); | ||
460 | if (GNUNET_NO == value_found) | 424 | if (GNUNET_NO == value_found) |
461 | { | 425 | { |
462 | /* wait at least 5s if the datastore is empty */ | 426 | /* wait at least 5s if the datastore is empty */ |
@@ -467,8 +431,9 @@ consider_gathering () | |||
467 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 431 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
468 | "Scheduling gathering task (queue size: %u)\n", | 432 | "Scheduling gathering task (queue size: %u)\n", |
469 | mig_size); | 433 | mig_size); |
470 | mig_task = | 434 | mig_task = GNUNET_SCHEDULER_add_delayed (delay, |
471 | GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); | 435 | &gather_migration_blocks, |
436 | NULL); | ||
472 | } | 437 | } |
473 | 438 | ||
474 | 439 | ||
@@ -549,14 +514,12 @@ process_migration_content (void *cls, | |||
549 | mig_size++; | 514 | mig_size++; |
550 | for (pos = peer_head; NULL != pos; pos = pos->next) | 515 | for (pos = peer_head; NULL != pos; pos = pos->next) |
551 | { | 516 | { |
552 | if (NULL == pos->th) | 517 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
553 | { | 518 | "Preparing to push best content to peer %s\n", |
554 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 519 | GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); |
555 | "Preparing to push best content to peer %s\n", | 520 | if (GNUNET_YES == transmit_content (pos, |
556 | GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); | 521 | mb)) |
557 | if (GNUNET_YES == transmit_content (pos, mb)) | 522 | break; /* 'mb' was freed! */ |
558 | break; /* 'mb' was freed! */ | ||
559 | } | ||
560 | } | 523 | } |
561 | consider_gathering (); | 524 | consider_gathering (); |
562 | } | 525 | } |
@@ -580,9 +543,11 @@ gather_migration_blocks (void *cls) | |||
580 | "Asking datastore for content for replication (queue size: %u)\n", | 543 | "Asking datastore for content for replication (queue size: %u)\n", |
581 | mig_size); | 544 | mig_size); |
582 | value_found = GNUNET_NO; | 545 | value_found = GNUNET_NO; |
583 | mig_qe = | 546 | mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, |
584 | GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, | 547 | 0, |
585 | &process_migration_content, NULL); | 548 | UINT_MAX, |
549 | &process_migration_content, | ||
550 | NULL); | ||
586 | if (NULL == mig_qe) | 551 | if (NULL == mig_qe) |
587 | consider_gathering (); | 552 | consider_gathering (); |
588 | } | 553 | } |
@@ -640,19 +605,11 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer) | |||
640 | break; | 605 | break; |
641 | if (NULL == pos) | 606 | if (NULL == pos) |
642 | return; | 607 | return; |
608 | if (NULL != pos->env) | ||
609 | GNUNET_MQ_send_cancel (pos->env); | ||
643 | GNUNET_CONTAINER_DLL_remove (peer_head, | 610 | GNUNET_CONTAINER_DLL_remove (peer_head, |
644 | peer_tail, | 611 | peer_tail, |
645 | pos); | 612 | pos); |
646 | if (NULL != pos->th) | ||
647 | { | ||
648 | GSF_peer_transmit_cancel_ (pos->th); | ||
649 | pos->th = NULL; | ||
650 | } | ||
651 | if (NULL != pos->msg) | ||
652 | { | ||
653 | GNUNET_free (pos->msg); | ||
654 | pos->msg = NULL; | ||
655 | } | ||
656 | GNUNET_free (pos); | 613 | GNUNET_free (pos); |
657 | } | 614 | } |
658 | 615 | ||
@@ -664,16 +621,21 @@ void | |||
664 | GSF_push_init_ () | 621 | GSF_push_init_ () |
665 | { | 622 | { |
666 | enabled = | 623 | enabled = |
667 | GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); | 624 | GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, |
625 | "FS", | ||
626 | "CONTENT_PUSHING"); | ||
668 | if (GNUNET_YES != enabled) | 627 | if (GNUNET_YES != enabled) |
669 | return; | 628 | return; |
670 | 629 | ||
671 | if (GNUNET_OK != | 630 | if (GNUNET_OK != |
672 | GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", | 631 | GNUNET_CONFIGURATION_get_value_time (GSF_cfg, |
632 | "fs", | ||
633 | "MIN_MIGRATION_DELAY", | ||
673 | &min_migration_delay)) | 634 | &min_migration_delay)) |
674 | { | 635 | { |
675 | GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, | 636 | GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, |
676 | "fs", "MIN_MIGRATION_DELAY", | 637 | "fs", |
638 | "MIN_MIGRATION_DELAY", | ||
677 | _("time required, content pushing disabled")); | 639 | _("time required, content pushing disabled")); |
678 | return; | 640 | return; |
679 | } | 641 | } |