diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-04-21 15:14:14 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-04-21 15:14:14 +0000 |
commit | 41700b6cb0345080513d82bdbe2dbe33c8654491 (patch) | |
tree | feab3a7f27cdadd01897239c86cbdd137da55b8f /src | |
parent | bbf3c4e04289295df264539e1017f3778f45f97f (diff) | |
download | gnunet-41700b6cb0345080513d82bdbe2dbe33c8654491.tar.gz gnunet-41700b6cb0345080513d82bdbe2dbe33c8654491.zip |
wip
Diffstat (limited to 'src')
-rw-r--r-- | src/fs/fs_download.c | 2 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_cp.c | 299 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_lc.c | 46 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_pe.c | 73 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 2 | ||||
-rw-r--r-- | src/fs/test_gnunet_service_fs_p2p.c | 4 |
6 files changed, 257 insertions, 169 deletions
diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index 785803edf..8192b8c1f 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c | |||
@@ -31,7 +31,7 @@ | |||
31 | #include "fs.h" | 31 | #include "fs.h" |
32 | #include "fs_tree.h" | 32 | #include "fs_tree.h" |
33 | 33 | ||
34 | #define DEBUG_DOWNLOAD GNUNET_YES | 34 | #define DEBUG_DOWNLOAD GNUNET_NO |
35 | 35 | ||
36 | /** | 36 | /** |
37 | * Determine if the given download (options and meta data) should cause | 37 | * Determine if the given download (options and meta data) should cause |
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 5aba83298..2522cbe7b 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c | |||
@@ -105,6 +105,11 @@ struct GSF_PeerTransmitHandle | |||
105 | int is_query; | 105 | int is_query; |
106 | 106 | ||
107 | /** | 107 | /** |
108 | * Did we get a reservation already? | ||
109 | */ | ||
110 | int was_reserved; | ||
111 | |||
112 | /** | ||
108 | * Priority of this request. | 113 | * Priority of this request. |
109 | */ | 114 | */ |
110 | uint32_t priority; | 115 | uint32_t priority; |
@@ -113,6 +118,30 @@ struct GSF_PeerTransmitHandle | |||
113 | 118 | ||
114 | 119 | ||
115 | /** | 120 | /** |
121 | * Information per peer and request. | ||
122 | */ | ||
123 | struct PeerRequest | ||
124 | { | ||
125 | |||
126 | /** | ||
127 | * Handle to generic request. | ||
128 | */ | ||
129 | struct GSF_PendingRequest *pr; | ||
130 | |||
131 | /** | ||
132 | * Handle to specific peer. | ||
133 | */ | ||
134 | struct GSF_ConnectedPeer *cp; | ||
135 | |||
136 | /** | ||
137 | * Task for asynchronous stopping of this request. | ||
138 | */ | ||
139 | GNUNET_SCHEDULER_TaskIdentifier kill_task; | ||
140 | |||
141 | }; | ||
142 | |||
143 | |||
144 | /** | ||
116 | * A connected peer. | 145 | * A connected peer. |
117 | */ | 146 | */ |
118 | struct GSF_ConnectedPeer | 147 | struct GSF_ConnectedPeer |
@@ -162,7 +191,7 @@ struct GSF_ConnectedPeer | |||
162 | GNUNET_SCHEDULER_TaskIdentifier irc_delay_task; | 191 | GNUNET_SCHEDULER_TaskIdentifier irc_delay_task; |
163 | 192 | ||
164 | /** | 193 | /** |
165 | * Active requests from this neighbour. | 194 | * Active requests from this neighbour, map of query to 'struct PeerRequest'. |
166 | */ | 195 | */ |
167 | struct GNUNET_CONTAINER_MultiHashMap *request_map; | 196 | struct GNUNET_CONTAINER_MultiHashMap *request_map; |
168 | 197 | ||
@@ -299,12 +328,99 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp) | |||
299 | static size_t | 328 | static size_t |
300 | peer_transmit_ready_cb (void *cls, | 329 | peer_transmit_ready_cb (void *cls, |
301 | size_t size, | 330 | size_t size, |
331 | void *buf); | ||
332 | |||
333 | |||
334 | |||
335 | |||
336 | /** | ||
337 | * Function called by core upon success or failure of our bandwidth reservation request. | ||
338 | * | ||
339 | * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request | ||
340 | * @param peer identifies the peer | ||
341 | * @param bandwidth_out available amount of outbound bandwidth | ||
342 | * @param amount set to the amount that was actually reserved or unreserved; | ||
343 | * either the full requested amount or zero (no partial reservations) | ||
344 | * @param res_delay if the reservation could not be satisfied (amount was 0), how | ||
345 | * long should the client wait until re-trying? | ||
346 | * @param preference current traffic preference for the given peer | ||
347 | */ | ||
348 | static void | ||
349 | core_reserve_callback (void *cls, | ||
350 | const struct GNUNET_PeerIdentity *peer, | ||
351 | struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, | ||
352 | int32_t amount, | ||
353 | struct GNUNET_TIME_Relative res_delay, | ||
354 | uint64_t preference); | ||
355 | |||
356 | |||
357 | /** | ||
358 | * If ready (bandwidth reserved), try to schedule transmission via | ||
359 | * core for the given handle. | ||
360 | * | ||
361 | * @param pth transmission handle to schedule | ||
362 | */ | ||
363 | static void | ||
364 | schedule_transmission (struct GSF_PeerTransmitHandle *pth) | ||
365 | { | ||
366 | struct GSF_ConnectedPeer *cp; | ||
367 | struct GNUNET_PeerIdentity target; | ||
368 | uint64_t ip; | ||
369 | |||
370 | if (NULL != pth->cth) | ||
371 | return; /* already done */ | ||
372 | cp = pth->cp; | ||
373 | GNUNET_PEER_resolve (cp->ppd.pid, | ||
374 | &target); | ||
375 | if ( (GNUNET_YES == pth->is_query) && | ||
376 | (GNUNET_YES != pth->was_reserved) ) | ||
377 | { | ||
378 | /* query, need reservation */ | ||
379 | if (GNUNET_YES != cp->did_reserve) | ||
380 | return; /* not ready */ | ||
381 | cp->did_reserve = GNUNET_NO; | ||
382 | /* reservation already done! */ | ||
383 | pth->was_reserved = GNUNET_YES; | ||
384 | ip = cp->inc_preference; | ||
385 | cp->inc_preference = 0; | ||
386 | cp->irc = GNUNET_CORE_peer_change_preference (GSF_core, | ||
387 | &target, | ||
388 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
389 | GNUNET_BANDWIDTH_VALUE_MAX, | ||
390 | DBLOCK_SIZE, | ||
391 | ip, | ||
392 | &core_reserve_callback, | ||
393 | cp); | ||
394 | } | ||
395 | pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core, | ||
396 | GNUNET_YES, | ||
397 | pth->priority, | ||
398 | GNUNET_TIME_absolute_get_remaining (pth->timeout), | ||
399 | &target, | ||
400 | pth->size, | ||
401 | &peer_transmit_ready_cb, | ||
402 | pth); | ||
403 | } | ||
404 | |||
405 | |||
406 | /** | ||
407 | * Core is ready to transmit to a peer, get the message. | ||
408 | * | ||
409 | * @param cls the 'struct GSF_PeerTransmitHandle' of the message | ||
410 | * @param size number of bytes core is willing to take | ||
411 | * @param buf where to copy the message | ||
412 | * @return number of bytes copied to buf | ||
413 | */ | ||
414 | static size_t | ||
415 | peer_transmit_ready_cb (void *cls, | ||
416 | size_t size, | ||
302 | void *buf) | 417 | void *buf) |
303 | { | 418 | { |
304 | struct GSF_PeerTransmitHandle *pth = cls; | 419 | struct GSF_PeerTransmitHandle *pth = cls; |
305 | struct GSF_ConnectedPeer *cp; | 420 | struct GSF_ConnectedPeer *cp; |
306 | size_t ret; | 421 | size_t ret; |
307 | 422 | ||
423 | pth->cth = NULL; | ||
308 | if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK) | 424 | if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK) |
309 | { | 425 | { |
310 | GNUNET_SCHEDULER_cancel (pth->timeout_task); | 426 | GNUNET_SCHEDULER_cancel (pth->timeout_task); |
@@ -327,33 +443,14 @@ peer_transmit_ready_cb (void *cls, | |||
327 | GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value); | 443 | GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value); |
328 | ret = pth->gmc (pth->gmc_cls, | 444 | ret = pth->gmc (pth->gmc_cls, |
329 | size, buf); | 445 | size, buf); |
330 | GNUNET_free (pth); | 446 | GNUNET_free (pth); |
447 | for (pth = cp->pth_head; pth != NULL; pth = pth->next) | ||
448 | schedule_transmission (pth); | ||
331 | return ret; | 449 | return ret; |
332 | } | 450 | } |
333 | 451 | ||
334 | 452 | ||
335 | /** | 453 | /** |
336 | * Function called by core upon success or failure of our bandwidth reservation request. | ||
337 | * | ||
338 | * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request | ||
339 | * @param peer identifies the peer | ||
340 | * @param bandwidth_out available amount of outbound bandwidth | ||
341 | * @param amount set to the amount that was actually reserved or unreserved; | ||
342 | * either the full requested amount or zero (no partial reservations) | ||
343 | * @param res_delay if the reservation could not be satisfied (amount was 0), how | ||
344 | * long should the client wait until re-trying? | ||
345 | * @param preference current traffic preference for the given peer | ||
346 | */ | ||
347 | static void | ||
348 | core_reserve_callback (void *cls, | ||
349 | const struct GNUNET_PeerIdentity *peer, | ||
350 | struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, | ||
351 | int32_t amount, | ||
352 | struct GNUNET_TIME_Relative res_delay, | ||
353 | uint64_t preference); | ||
354 | |||
355 | |||
356 | /** | ||
357 | * (re)try to reserve bandwidth from the given peer. | 454 | * (re)try to reserve bandwidth from the given peer. |
358 | * | 455 | * |
359 | * @param cls the 'struct GSF_ConnectedPeer' to reserve from | 456 | * @param cls the 'struct GSF_ConnectedPeer' to reserve from |
@@ -607,6 +704,36 @@ copy_reply (void *cls, | |||
607 | 704 | ||
608 | 705 | ||
609 | /** | 706 | /** |
707 | * Free the given client request. | ||
708 | * | ||
709 | * @param cls the client request to free | ||
710 | * @param tc task context | ||
711 | */ | ||
712 | static void | ||
713 | peer_request_destroy (void *cls, | ||
714 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
715 | { | ||
716 | struct PeerRequest *peerreq = cls; | ||
717 | struct GSF_PendingRequest *pr = peerreq->pr; | ||
718 | struct GSF_ConnectedPeer *cp = peerreq->cp; | ||
719 | struct GSF_PendingRequestData *prd; | ||
720 | |||
721 | peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK; | ||
722 | prd = GSF_pending_request_get_data_ (pr); | ||
723 | GNUNET_STATISTICS_update (GSF_stats, | ||
724 | gettext_noop ("# P2P searches active"), | ||
725 | -1, | ||
726 | GNUNET_NO); | ||
727 | GNUNET_break (GNUNET_OK == | ||
728 | GNUNET_CONTAINER_multihashmap_remove (cp->request_map, | ||
729 | &prd->query, | ||
730 | peerreq)); | ||
731 | GSF_pending_request_cancel_ (pr); | ||
732 | GNUNET_free (peerreq); | ||
733 | } | ||
734 | |||
735 | |||
736 | /** | ||
610 | * Handle a reply to a pending request. Also called if a request | 737 | * Handle a reply to a pending request. Also called if a request |
611 | * expires (then with data == NULL). The handler may be called | 738 | * expires (then with data == NULL). The handler may be called |
612 | * many times (depending on the request type), but will not be | 739 | * many times (depending on the request type), but will not be |
@@ -614,8 +741,7 @@ copy_reply (void *cls, | |||
614 | * and will also not be called anymore after a call signalling | 741 | * and will also not be called anymore after a call signalling |
615 | * expiration. | 742 | * expiration. |
616 | * | 743 | * |
617 | * @param cls 'struct GSF_ConnectedPeer' of the peer that would | 744 | * @param cls 'struct PeerRequest' this is an answer for |
618 | * have liked an answer to the request | ||
619 | * @param eval evaluation of the result | 745 | * @param eval evaluation of the result |
620 | * @param pr handle to the original pending request | 746 | * @param pr handle to the original pending request |
621 | * @param expiration when does 'data' expire? | 747 | * @param expiration when does 'data' expire? |
@@ -632,12 +758,14 @@ handle_p2p_reply (void *cls, | |||
632 | const void *data, | 758 | const void *data, |
633 | size_t data_len) | 759 | size_t data_len) |
634 | { | 760 | { |
635 | struct GSF_ConnectedPeer *cp = cls; | 761 | struct PeerRequest *peerreq = cls; |
762 | struct GSF_ConnectedPeer *cp = peerreq->cp; | ||
636 | struct GSF_PendingRequestData *prd; | 763 | struct GSF_PendingRequestData *prd; |
637 | struct PutMessage *pm; | 764 | struct PutMessage *pm; |
638 | size_t msize; | 765 | size_t msize; |
639 | 766 | ||
640 | GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE); | 767 | GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE); |
768 | GNUNET_assert (peerreq->pr == pr); | ||
641 | prd = GSF_pending_request_get_data_ (pr); | 769 | prd = GSF_pending_request_get_data_ (pr); |
642 | if (NULL == data) | 770 | if (NULL == data) |
643 | { | 771 | { |
@@ -648,7 +776,8 @@ handle_p2p_reply (void *cls, | |||
648 | GNUNET_break (GNUNET_OK == | 776 | GNUNET_break (GNUNET_OK == |
649 | GNUNET_CONTAINER_multihashmap_remove (cp->request_map, | 777 | GNUNET_CONTAINER_multihashmap_remove (cp->request_map, |
650 | &prd->query, | 778 | &prd->query, |
651 | pr)); | 779 | peerreq)); |
780 | GNUNET_free (peerreq); | ||
652 | return; | 781 | return; |
653 | } | 782 | } |
654 | GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY); | 783 | GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY); |
@@ -687,15 +816,8 @@ handle_p2p_reply (void *cls, | |||
687 | pm); | 816 | pm); |
688 | if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) | 817 | if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) |
689 | return; | 818 | return; |
690 | GNUNET_STATISTICS_update (GSF_stats, | 819 | peerreq->kill_task = GNUNET_SCHEDULER_add_now (&peer_request_destroy, |
691 | gettext_noop ("# P2P searches active"), | 820 | peerreq); |
692 | -1, | ||
693 | GNUNET_NO); | ||
694 | GNUNET_break (GNUNET_OK == | ||
695 | GNUNET_CONTAINER_multihashmap_remove (cp->request_map, | ||
696 | &prd->query, | ||
697 | pr)); | ||
698 | GSF_pending_request_cancel_ (pr); | ||
699 | } | 821 | } |
700 | 822 | ||
701 | 823 | ||
@@ -844,6 +966,7 @@ struct GSF_PendingRequest * | |||
844 | GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | 966 | GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, |
845 | const struct GNUNET_MessageHeader *message) | 967 | const struct GNUNET_MessageHeader *message) |
846 | { | 968 | { |
969 | struct PeerRequest *peerreq; | ||
847 | struct GSF_PendingRequest *pr; | 970 | struct GSF_PendingRequest *pr; |
848 | struct GSF_PendingRequestData *prd; | 971 | struct GSF_PendingRequestData *prd; |
849 | struct GSF_ConnectedPeer *cp; | 972 | struct GSF_ConnectedPeer *cp; |
@@ -1002,10 +1125,11 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1002 | ttl -= ttl_decrement; | 1125 | ttl -= ttl_decrement; |
1003 | 1126 | ||
1004 | /* test if the request already exists */ | 1127 | /* test if the request already exists */ |
1005 | pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map, | 1128 | peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, |
1006 | &gm->query); | 1129 | &gm->query); |
1007 | if (pr != NULL) | 1130 | if (peerreq != NULL) |
1008 | { | 1131 | { |
1132 | pr = peerreq->pr; | ||
1009 | prd = GSF_pending_request_get_data_ (pr); | 1133 | prd = GSF_pending_request_get_data_ (pr); |
1010 | if ( (prd->type == type) && | 1134 | if ( (prd->type == type) && |
1011 | ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) || | 1135 | ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) || |
@@ -1034,10 +1158,15 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1034 | GNUNET_assert (GNUNET_YES == | 1158 | GNUNET_assert (GNUNET_YES == |
1035 | GNUNET_CONTAINER_multihashmap_remove (cp->request_map, | 1159 | GNUNET_CONTAINER_multihashmap_remove (cp->request_map, |
1036 | &gm->query, | 1160 | &gm->query, |
1037 | pr)); | 1161 | peerreq)); |
1162 | if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK) | ||
1163 | GNUNET_SCHEDULER_cancel (peerreq->kill_task); | ||
1164 | GNUNET_free (peerreq); | ||
1038 | } | 1165 | } |
1039 | } | 1166 | } |
1040 | 1167 | ||
1168 | peerreq = GNUNET_malloc (sizeof (struct PeerRequest)); | ||
1169 | peerreq->cp = cp; | ||
1041 | pr = GSF_pending_request_create_ (options, | 1170 | pr = GSF_pending_request_create_ (options, |
1042 | type, | 1171 | type, |
1043 | &gm->query, | 1172 | &gm->query, |
@@ -1052,11 +1181,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1052 | spid, | 1181 | spid, |
1053 | NULL, 0, /* replies_seen */ | 1182 | NULL, 0, /* replies_seen */ |
1054 | &handle_p2p_reply, | 1183 | &handle_p2p_reply, |
1055 | cp); | 1184 | peerreq); |
1185 | peerreq->pr = pr; | ||
1056 | GNUNET_break (GNUNET_OK == | 1186 | GNUNET_break (GNUNET_OK == |
1057 | GNUNET_CONTAINER_multihashmap_put (cp->request_map, | 1187 | GNUNET_CONTAINER_multihashmap_put (cp->request_map, |
1058 | &gm->query, | 1188 | &gm->query, |
1059 | pr, | 1189 | peerreq, |
1060 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 1190 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
1061 | GNUNET_STATISTICS_update (GSF_stats, | 1191 | GNUNET_STATISTICS_update (GSF_stats, |
1062 | gettext_noop ("# P2P searches received"), | 1192 | gettext_noop ("# P2P searches received"), |
@@ -1131,9 +1261,6 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | |||
1131 | struct GSF_PeerTransmitHandle *pth; | 1261 | struct GSF_PeerTransmitHandle *pth; |
1132 | struct GSF_PeerTransmitHandle *pos; | 1262 | struct GSF_PeerTransmitHandle *pos; |
1133 | struct GSF_PeerTransmitHandle *prev; | 1263 | struct GSF_PeerTransmitHandle *prev; |
1134 | struct GNUNET_PeerIdentity target; | ||
1135 | uint64_t ip; | ||
1136 | int is_ready; | ||
1137 | 1264 | ||
1138 | pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle)); | 1265 | pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle)); |
1139 | pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); | 1266 | pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); |
@@ -1162,79 +1289,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | |||
1162 | cp->pth_tail, | 1289 | cp->pth_tail, |
1163 | prev, | 1290 | prev, |
1164 | pth); | 1291 | pth); |
1165 | GNUNET_PEER_resolve (cp->ppd.pid, | ||
1166 | &target); | ||
1167 | if (GNUNET_YES == is_query) | 1292 | if (GNUNET_YES == is_query) |
1168 | { | 1293 | cp->ppd.pending_queries++; |
1169 | /* query, need reservation */ | ||
1170 | cp->ppd.pending_queries++; | ||
1171 | if (GNUNET_YES == cp->did_reserve) | ||
1172 | { | ||
1173 | cp->did_reserve = GNUNET_NO; | ||
1174 | /* reservation already done! */ | ||
1175 | is_ready = GNUNET_YES; | ||
1176 | ip = cp->inc_preference; | ||
1177 | cp->inc_preference = 0; | ||
1178 | cp->irc = GNUNET_CORE_peer_change_preference (GSF_core, | ||
1179 | &target, | ||
1180 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1181 | GNUNET_BANDWIDTH_VALUE_MAX, | ||
1182 | DBLOCK_SIZE, | ||
1183 | ip, | ||
1184 | &core_reserve_callback, | ||
1185 | cp); | ||
1186 | } | ||
1187 | else | ||
1188 | { | ||
1189 | /* still waiting for reservation */ | ||
1190 | is_ready = GNUNET_NO; | ||
1191 | } | ||
1192 | } | ||
1193 | else if (GNUNET_NO == is_query) | 1294 | else if (GNUNET_NO == is_query) |
1194 | { | 1295 | cp->ppd.pending_replies++; |
1195 | /* no reservation needed for content */ | 1296 | |
1196 | cp->ppd.pending_replies++; | 1297 | pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, |
1197 | is_ready = GNUNET_YES; | 1298 | &peer_transmit_timeout, |
1198 | } | ||
1199 | else | ||
1200 | { | ||
1201 | /* not a query or content, no reservation needed */ | ||
1202 | is_ready = GNUNET_YES; | ||
1203 | } | ||
1204 | if (is_ready) | ||
1205 | { | ||
1206 | pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core, | ||
1207 | GNUNET_YES, | ||
1208 | priority, | ||
1209 | timeout, | ||
1210 | &target, | ||
1211 | size, | ||
1212 | &peer_transmit_ready_cb, | ||
1213 | pth); | 1299 | pth); |
1214 | /* pth->cth could be NULL here, that's OK, we'll try again | 1300 | schedule_transmission (pth); |
1215 | later... */ | ||
1216 | } | ||
1217 | else | ||
1218 | { | ||
1219 | #if DEBUG_FS | ||
1220 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1221 | "Not ready to ask for transmission to `%s'\n", | ||
1222 | GNUNET_i2s (&target)); | ||
1223 | #endif | ||
1224 | } | ||
1225 | if (pth->cth == NULL) | ||
1226 | { | ||
1227 | #if DEBUG_FS | ||
1228 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1229 | "No transmission task scheduled, creating timeout task (%llu ms)\n", | ||
1230 | (unsigned long long) timeout.rel_value); | ||
1231 | #endif | ||
1232 | /* if we're waiting for reservation OR if we could not do notify_transmit_ready, | ||
1233 | install a timeout task to be on the safe side */ | ||
1234 | pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, | ||
1235 | &peer_transmit_timeout, | ||
1236 | pth); | ||
1237 | } | ||
1238 | return pth; | 1301 | return pth; |
1239 | } | 1302 | } |
1240 | 1303 | ||
@@ -1364,9 +1427,13 @@ cancel_pending_request (void *cls, | |||
1364 | const GNUNET_HashCode *query, | 1427 | const GNUNET_HashCode *query, |
1365 | void *value) | 1428 | void *value) |
1366 | { | 1429 | { |
1367 | struct GSF_PendingRequest *pr = value; | 1430 | struct PeerRequest *peerreq = cls; |
1431 | struct GSF_PendingRequest *pr = peerreq->pr; | ||
1368 | 1432 | ||
1369 | GSF_pending_request_cancel_ (pr); | 1433 | GSF_pending_request_cancel_ (pr); |
1434 | if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK) | ||
1435 | GNUNET_SCHEDULER_cancel (peerreq->kill_task); | ||
1436 | GNUNET_free (peerreq); | ||
1370 | return GNUNET_OK; | 1437 | return GNUNET_OK; |
1371 | } | 1438 | } |
1372 | 1439 | ||
diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c index 58a1a0933..15dd4897a 100644 --- a/src/fs/gnunet-service-fs_lc.c +++ b/src/fs/gnunet-service-fs_lc.c | |||
@@ -58,6 +58,11 @@ struct ClientRequest | |||
58 | */ | 58 | */ |
59 | struct GSF_LocalClient *lc; | 59 | struct GSF_LocalClient *lc; |
60 | 60 | ||
61 | /** | ||
62 | * Task scheduled to destroy the request. | ||
63 | */ | ||
64 | GNUNET_SCHEDULER_TaskIdentifier kill_task; | ||
65 | |||
61 | }; | 66 | }; |
62 | 67 | ||
63 | 68 | ||
@@ -180,6 +185,33 @@ GSF_local_client_lookup_ (struct GNUNET_SERVER_Client *client) | |||
180 | 185 | ||
181 | 186 | ||
182 | /** | 187 | /** |
188 | * Free the given client request. | ||
189 | * | ||
190 | * @param cls the client request to free | ||
191 | * @param tc task context | ||
192 | */ | ||
193 | static void | ||
194 | client_request_destroy (void *cls, | ||
195 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
196 | { | ||
197 | struct ClientRequest *cr = cls; | ||
198 | struct GSF_LocalClient *lc; | ||
199 | |||
200 | cr->kill_task = GNUNET_SCHEDULER_NO_TASK; | ||
201 | lc = cr->lc; | ||
202 | GNUNET_CONTAINER_DLL_remove (lc->cr_head, | ||
203 | lc->cr_tail, | ||
204 | cr); | ||
205 | GSF_pending_request_cancel_ (cr->pr); | ||
206 | GNUNET_STATISTICS_update (GSF_stats, | ||
207 | gettext_noop ("# client searches active"), | ||
208 | - 1, | ||
209 | GNUNET_NO); | ||
210 | GNUNET_free (cr); | ||
211 | } | ||
212 | |||
213 | |||
214 | /** | ||
183 | * Handle a reply to a pending request. Also called if a request | 215 | * Handle a reply to a pending request. Also called if a request |
184 | * expires (then with data == NULL). The handler may be called | 216 | * expires (then with data == NULL). The handler may be called |
185 | * many times (depending on the request type), but will not be | 217 | * many times (depending on the request type), but will not be |
@@ -246,16 +278,8 @@ client_response_handler (void *cls, | |||
246 | #endif | 278 | #endif |
247 | if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) | 279 | if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) |
248 | return; | 280 | return; |
249 | GNUNET_CONTAINER_DLL_remove (lc->cr_head, | 281 | cr->kill_task = GNUNET_SCHEDULER_add_now (&client_request_destroy, |
250 | lc->cr_tail, | 282 | cr); |
251 | cr); | ||
252 | GSF_pending_request_cancel_ (cr->pr); | ||
253 | GNUNET_STATISTICS_update (GSF_stats, | ||
254 | gettext_noop ("# client searches active"), | ||
255 | - 1, | ||
256 | GNUNET_NO); | ||
257 | GNUNET_free (cr); | ||
258 | |||
259 | } | 283 | } |
260 | 284 | ||
261 | 285 | ||
@@ -489,6 +513,8 @@ GSF_client_disconnect_handler_ (void *cls, | |||
489 | gettext_noop ("# client searches active"), | 513 | gettext_noop ("# client searches active"), |
490 | - 1, | 514 | - 1, |
491 | GNUNET_NO); | 515 | GNUNET_NO); |
516 | if (GNUNET_SCHEDULER_NO_TASK != cr->kill_task) | ||
517 | GNUNET_SCHEDULER_cancel (cr->kill_task); | ||
492 | GNUNET_free (cr); | 518 | GNUNET_free (cr); |
493 | } | 519 | } |
494 | while (NULL != (res = pos->res_head)) | 520 | while (NULL != (res = pos->res_head)) |
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index b3f46cf45..28036150f 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c | |||
@@ -121,6 +121,17 @@ static struct GNUNET_CONTAINER_MultiHashMap *plans; | |||
121 | 121 | ||
122 | 122 | ||
123 | /** | 123 | /** |
124 | * Figure out when and how to transmit to the given peer. | ||
125 | * | ||
126 | * @param cls the 'struct GSF_ConnectedPeer' for transmission | ||
127 | * @param tc scheduler context | ||
128 | */ | ||
129 | static void | ||
130 | schedule_peer_transmission (void *cls, | ||
131 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
132 | |||
133 | |||
134 | /** | ||
124 | * Insert the given request plan into the heap with the appropriate weight. | 135 | * Insert the given request plan into the heap with the appropriate weight. |
125 | * | 136 | * |
126 | * @param pp associated peer's plan | 137 | * @param pp associated peer's plan |
@@ -156,21 +167,13 @@ plan (struct PeerPlan *pp, | |||
156 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap, | 167 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap, |
157 | rp, | 168 | rp, |
158 | rp->earliest_transmission.abs_value); | 169 | rp->earliest_transmission.abs_value); |
170 | if (GNUNET_SCHEDULER_NO_TASK != pp->task) | ||
171 | GNUNET_SCHEDULER_cancel (pp->task); | ||
172 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
159 | } | 173 | } |
160 | 174 | ||
161 | 175 | ||
162 | /** | 176 | /** |
163 | * Figure out when and how to transmit to the given peer. | ||
164 | * | ||
165 | * @param cls the 'struct GSF_ConnectedPeer' for transmission | ||
166 | * @param tc scheduler context | ||
167 | */ | ||
168 | static void | ||
169 | schedule_peer_transmission (void *cls, | ||
170 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
171 | |||
172 | |||
173 | /** | ||
174 | * Function called to get a message for transmission. | 177 | * Function called to get a message for transmission. |
175 | * | 178 | * |
176 | * @param cls closure | 179 | * @param cls closure |
@@ -238,7 +241,11 @@ schedule_peer_transmission (void *cls, | |||
238 | size_t msize; | 241 | size_t msize; |
239 | 242 | ||
240 | pp->task = GNUNET_SCHEDULER_NO_TASK; | 243 | pp->task = GNUNET_SCHEDULER_NO_TASK; |
241 | GNUNET_assert (NULL == pp->pth); | 244 | if (pp->pth != NULL) |
245 | { | ||
246 | GSF_peer_transmit_cancel_ (pp->pth); | ||
247 | pp->pth = NULL; | ||
248 | } | ||
242 | /* move ready requests to priority queue */ | 249 | /* move ready requests to priority queue */ |
243 | while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && | 250 | while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && |
244 | (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) ) | 251 | (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) ) |
@@ -253,7 +260,20 @@ schedule_peer_transmission (void *cls, | |||
253 | /* priority heap (still) empty, check for delay... */ | 260 | /* priority heap (still) empty, check for delay... */ |
254 | rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap); | 261 | rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap); |
255 | if (NULL == rp) | 262 | if (NULL == rp) |
256 | return; /* both queues empty */ | 263 | { |
264 | #if DEBUG_FS | ||
265 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
266 | "No active requests for plan %p.\n", | ||
267 | pp); | ||
268 | #endif | ||
269 | return; /* both queues empty */ | ||
270 | } | ||
271 | #if DEBUG_FS | ||
272 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
273 | "Sleeping for %llu ms before retrying requests on plan %p.\n", | ||
274 | (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value, | ||
275 | pp); | ||
276 | #endif | ||
257 | pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), | 277 | pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), |
258 | &schedule_peer_transmission, | 278 | &schedule_peer_transmission, |
259 | pp); | 279 | pp); |
@@ -267,7 +287,7 @@ schedule_peer_transmission (void *cls, | |||
267 | rp); | 287 | rp); |
268 | #endif | 288 | #endif |
269 | GNUNET_assert (NULL != rp); | 289 | GNUNET_assert (NULL != rp); |
270 | msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); | 290 | msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); |
271 | pp->pth = GSF_peer_transmit_ (pp->cp, | 291 | pp->pth = GSF_peer_transmit_ (pp->cp, |
272 | GNUNET_YES, | 292 | GNUNET_YES, |
273 | rp->priority, | 293 | rp->priority, |
@@ -322,31 +342,6 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, | |||
322 | prd->rp_tail, | 342 | prd->rp_tail, |
323 | rp); | 343 | rp); |
324 | plan (pp, rp); | 344 | plan (pp, rp); |
325 | if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) | ||
326 | { | ||
327 | /* no request that should be done immediately, figure out delay */ | ||
328 | if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap)) | ||
329 | return; /* did not change delay heap top, no need to do anything */ | ||
330 | GNUNET_assert (NULL == pp->pth); | ||
331 | if (GNUNET_SCHEDULER_NO_TASK != pp->task) | ||
332 | GNUNET_SCHEDULER_cancel (pp->task); | ||
333 | pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), | ||
334 | &schedule_peer_transmission, | ||
335 | pp); | ||
336 | return; | ||
337 | } | ||
338 | |||
339 | if (pp->pth != NULL) | ||
340 | { | ||
341 | if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap)) | ||
342 | return; /* did not change priority heap top, no need to do anyhing */ | ||
343 | GSF_peer_transmit_cancel_ (pp->pth); | ||
344 | pp->pth = NULL; | ||
345 | } | ||
346 | if (GNUNET_SCHEDULER_NO_TASK != pp->task) | ||
347 | GNUNET_SCHEDULER_cancel (pp->task); | ||
348 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, | ||
349 | pp); | ||
350 | } | 345 | } |
351 | 346 | ||
352 | 347 | ||
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index f327d9b4b..7406bed0f 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -1008,7 +1008,7 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr) | |||
1008 | */ | 1008 | */ |
1009 | static void | 1009 | static void |
1010 | process_local_reply (void *cls, | 1010 | process_local_reply (void *cls, |
1011 | const GNUNET_HashCode * key, | 1011 | const GNUNET_HashCode *key, |
1012 | size_t size, | 1012 | size_t size, |
1013 | const void *data, | 1013 | const void *data, |
1014 | enum GNUNET_BLOCK_Type type, | 1014 | enum GNUNET_BLOCK_Type type, |
diff --git a/src/fs/test_gnunet_service_fs_p2p.c b/src/fs/test_gnunet_service_fs_p2p.c index 3bb808c48..91cfda0eb 100644 --- a/src/fs/test_gnunet_service_fs_p2p.c +++ b/src/fs/test_gnunet_service_fs_p2p.c | |||
@@ -34,9 +34,9 @@ | |||
34 | #define FILESIZE (1024 * 1024 * 2) | 34 | #define FILESIZE (1024 * 1024 * 2) |
35 | 35 | ||
36 | /** | 36 | /** |
37 | * How long until we give up on transmitting the message? | 37 | * How long until we give up on the download? |
38 | */ | 38 | */ |
39 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 600) | 39 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60) |
40 | 40 | ||
41 | #define NUM_DAEMONS 2 | 41 | #define NUM_DAEMONS 2 |
42 | 42 | ||