aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-04-21 15:14:14 +0000
committerChristian Grothoff <christian@grothoff.org>2011-04-21 15:14:14 +0000
commit41700b6cb0345080513d82bdbe2dbe33c8654491 (patch)
treefeab3a7f27cdadd01897239c86cbdd137da55b8f /src
parentbbf3c4e04289295df264539e1017f3778f45f97f (diff)
downloadgnunet-41700b6cb0345080513d82bdbe2dbe33c8654491.tar.gz
gnunet-41700b6cb0345080513d82bdbe2dbe33c8654491.zip
wip
Diffstat (limited to 'src')
-rw-r--r--src/fs/fs_download.c2
-rw-r--r--src/fs/gnunet-service-fs_cp.c299
-rw-r--r--src/fs/gnunet-service-fs_lc.c46
-rw-r--r--src/fs/gnunet-service-fs_pe.c73
-rw-r--r--src/fs/gnunet-service-fs_pr.c2
-rw-r--r--src/fs/test_gnunet_service_fs_p2p.c4
6 files changed, 257 insertions, 169 deletions
diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c
index 785803edf..8192b8c1f 100644
--- a/src/fs/fs_download.c
+++ b/src/fs/fs_download.c
@@ -31,7 +31,7 @@
31#include "fs.h" 31#include "fs.h"
32#include "fs_tree.h" 32#include "fs_tree.h"
33 33
34#define DEBUG_DOWNLOAD GNUNET_YES 34#define DEBUG_DOWNLOAD GNUNET_NO
35 35
36/** 36/**
37 * Determine if the given download (options and meta data) should cause 37 * Determine if the given download (options and meta data) should cause
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 5aba83298..2522cbe7b 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -105,6 +105,11 @@ struct GSF_PeerTransmitHandle
105 int is_query; 105 int is_query;
106 106
107 /** 107 /**
108 * Did we get a reservation already?
109 */
110 int was_reserved;
111
112 /**
108 * Priority of this request. 113 * Priority of this request.
109 */ 114 */
110 uint32_t priority; 115 uint32_t priority;
@@ -113,6 +118,30 @@ struct GSF_PeerTransmitHandle
113 118
114 119
115/** 120/**
121 * Information per peer and request.
122 */
123struct PeerRequest
124{
125
126 /**
127 * Handle to generic request.
128 */
129 struct GSF_PendingRequest *pr;
130
131 /**
132 * Handle to specific peer.
133 */
134 struct GSF_ConnectedPeer *cp;
135
136 /**
137 * Task for asynchronous stopping of this request.
138 */
139 GNUNET_SCHEDULER_TaskIdentifier kill_task;
140
141};
142
143
144/**
116 * A connected peer. 145 * A connected peer.
117 */ 146 */
118struct GSF_ConnectedPeer 147struct GSF_ConnectedPeer
@@ -162,7 +191,7 @@ struct GSF_ConnectedPeer
162 GNUNET_SCHEDULER_TaskIdentifier irc_delay_task; 191 GNUNET_SCHEDULER_TaskIdentifier irc_delay_task;
163 192
164 /** 193 /**
165 * Active requests from this neighbour. 194 * Active requests from this neighbour, map of query to 'struct PeerRequest'.
166 */ 195 */
167 struct GNUNET_CONTAINER_MultiHashMap *request_map; 196 struct GNUNET_CONTAINER_MultiHashMap *request_map;
168 197
@@ -299,12 +328,99 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
299static size_t 328static size_t
300peer_transmit_ready_cb (void *cls, 329peer_transmit_ready_cb (void *cls,
301 size_t size, 330 size_t size,
331 void *buf);
332
333
334
335
336/**
337 * Function called by core upon success or failure of our bandwidth reservation request.
338 *
339 * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
340 * @param peer identifies the peer
341 * @param bandwidth_out available amount of outbound bandwidth
342 * @param amount set to the amount that was actually reserved or unreserved;
343 * either the full requested amount or zero (no partial reservations)
344 * @param res_delay if the reservation could not be satisfied (amount was 0), how
345 * long should the client wait until re-trying?
346 * @param preference current traffic preference for the given peer
347 */
348static void
349core_reserve_callback (void *cls,
350 const struct GNUNET_PeerIdentity *peer,
351 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
352 int32_t amount,
353 struct GNUNET_TIME_Relative res_delay,
354 uint64_t preference);
355
356
357/**
358 * If ready (bandwidth reserved), try to schedule transmission via
359 * core for the given handle.
360 *
361 * @param pth transmission handle to schedule
362 */
363static void
364schedule_transmission (struct GSF_PeerTransmitHandle *pth)
365{
366 struct GSF_ConnectedPeer *cp;
367 struct GNUNET_PeerIdentity target;
368 uint64_t ip;
369
370 if (NULL != pth->cth)
371 return; /* already done */
372 cp = pth->cp;
373 GNUNET_PEER_resolve (cp->ppd.pid,
374 &target);
375 if ( (GNUNET_YES == pth->is_query) &&
376 (GNUNET_YES != pth->was_reserved) )
377 {
378 /* query, need reservation */
379 if (GNUNET_YES != cp->did_reserve)
380 return; /* not ready */
381 cp->did_reserve = GNUNET_NO;
382 /* reservation already done! */
383 pth->was_reserved = GNUNET_YES;
384 ip = cp->inc_preference;
385 cp->inc_preference = 0;
386 cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
387 &target,
388 GNUNET_TIME_UNIT_FOREVER_REL,
389 GNUNET_BANDWIDTH_VALUE_MAX,
390 DBLOCK_SIZE,
391 ip,
392 &core_reserve_callback,
393 cp);
394 }
395 pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
396 GNUNET_YES,
397 pth->priority,
398 GNUNET_TIME_absolute_get_remaining (pth->timeout),
399 &target,
400 pth->size,
401 &peer_transmit_ready_cb,
402 pth);
403}
404
405
406/**
407 * Core is ready to transmit to a peer, get the message.
408 *
409 * @param cls the 'struct GSF_PeerTransmitHandle' of the message
410 * @param size number of bytes core is willing to take
411 * @param buf where to copy the message
412 * @return number of bytes copied to buf
413 */
414static size_t
415peer_transmit_ready_cb (void *cls,
416 size_t size,
302 void *buf) 417 void *buf)
303{ 418{
304 struct GSF_PeerTransmitHandle *pth = cls; 419 struct GSF_PeerTransmitHandle *pth = cls;
305 struct GSF_ConnectedPeer *cp; 420 struct GSF_ConnectedPeer *cp;
306 size_t ret; 421 size_t ret;
307 422
423 pth->cth = NULL;
308 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK) 424 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
309 { 425 {
310 GNUNET_SCHEDULER_cancel (pth->timeout_task); 426 GNUNET_SCHEDULER_cancel (pth->timeout_task);
@@ -327,33 +443,14 @@ peer_transmit_ready_cb (void *cls,
327 GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value); 443 GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);
328 ret = pth->gmc (pth->gmc_cls, 444 ret = pth->gmc (pth->gmc_cls,
329 size, buf); 445 size, buf);
330 GNUNET_free (pth); 446 GNUNET_free (pth);
447 for (pth = cp->pth_head; pth != NULL; pth = pth->next)
448 schedule_transmission (pth);
331 return ret; 449 return ret;
332} 450}
333 451
334 452
335/** 453/**
336 * Function called by core upon success or failure of our bandwidth reservation request.
337 *
338 * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
339 * @param peer identifies the peer
340 * @param bandwidth_out available amount of outbound bandwidth
341 * @param amount set to the amount that was actually reserved or unreserved;
342 * either the full requested amount or zero (no partial reservations)
343 * @param res_delay if the reservation could not be satisfied (amount was 0), how
344 * long should the client wait until re-trying?
345 * @param preference current traffic preference for the given peer
346 */
347static void
348core_reserve_callback (void *cls,
349 const struct GNUNET_PeerIdentity *peer,
350 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
351 int32_t amount,
352 struct GNUNET_TIME_Relative res_delay,
353 uint64_t preference);
354
355
356/**
357 * (re)try to reserve bandwidth from the given peer. 454 * (re)try to reserve bandwidth from the given peer.
358 * 455 *
359 * @param cls the 'struct GSF_ConnectedPeer' to reserve from 456 * @param cls the 'struct GSF_ConnectedPeer' to reserve from
@@ -607,6 +704,36 @@ copy_reply (void *cls,
607 704
608 705
609/** 706/**
707 * Free the given client request.
708 *
709 * @param cls the client request to free
710 * @param tc task context
711 */
712static void
713peer_request_destroy (void *cls,
714 const struct GNUNET_SCHEDULER_TaskContext *tc)
715{
716 struct PeerRequest *peerreq = cls;
717 struct GSF_PendingRequest *pr = peerreq->pr;
718 struct GSF_ConnectedPeer *cp = peerreq->cp;
719 struct GSF_PendingRequestData *prd;
720
721 peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
722 prd = GSF_pending_request_get_data_ (pr);
723 GNUNET_STATISTICS_update (GSF_stats,
724 gettext_noop ("# P2P searches active"),
725 -1,
726 GNUNET_NO);
727 GNUNET_break (GNUNET_OK ==
728 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
729 &prd->query,
730 peerreq));
731 GSF_pending_request_cancel_ (pr);
732 GNUNET_free (peerreq);
733}
734
735
736/**
610 * Handle a reply to a pending request. Also called if a request 737 * Handle a reply to a pending request. Also called if a request
611 * expires (then with data == NULL). The handler may be called 738 * expires (then with data == NULL). The handler may be called
612 * many times (depending on the request type), but will not be 739 * many times (depending on the request type), but will not be
@@ -614,8 +741,7 @@ copy_reply (void *cls,
614 * and will also not be called anymore after a call signalling 741 * and will also not be called anymore after a call signalling
615 * expiration. 742 * expiration.
616 * 743 *
617 * @param cls 'struct GSF_ConnectedPeer' of the peer that would 744 * @param cls 'struct PeerRequest' this is an answer for
618 * have liked an answer to the request
619 * @param eval evaluation of the result 745 * @param eval evaluation of the result
620 * @param pr handle to the original pending request 746 * @param pr handle to the original pending request
621 * @param expiration when does 'data' expire? 747 * @param expiration when does 'data' expire?
@@ -632,12 +758,14 @@ handle_p2p_reply (void *cls,
632 const void *data, 758 const void *data,
633 size_t data_len) 759 size_t data_len)
634{ 760{
635 struct GSF_ConnectedPeer *cp = cls; 761 struct PeerRequest *peerreq = cls;
762 struct GSF_ConnectedPeer *cp = peerreq->cp;
636 struct GSF_PendingRequestData *prd; 763 struct GSF_PendingRequestData *prd;
637 struct PutMessage *pm; 764 struct PutMessage *pm;
638 size_t msize; 765 size_t msize;
639 766
640 GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE); 767 GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE);
768 GNUNET_assert (peerreq->pr == pr);
641 prd = GSF_pending_request_get_data_ (pr); 769 prd = GSF_pending_request_get_data_ (pr);
642 if (NULL == data) 770 if (NULL == data)
643 { 771 {
@@ -648,7 +776,8 @@ handle_p2p_reply (void *cls,
648 GNUNET_break (GNUNET_OK == 776 GNUNET_break (GNUNET_OK ==
649 GNUNET_CONTAINER_multihashmap_remove (cp->request_map, 777 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
650 &prd->query, 778 &prd->query,
651 pr)); 779 peerreq));
780 GNUNET_free (peerreq);
652 return; 781 return;
653 } 782 }
654 GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY); 783 GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
@@ -687,15 +816,8 @@ handle_p2p_reply (void *cls,
687 pm); 816 pm);
688 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) 817 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
689 return; 818 return;
690 GNUNET_STATISTICS_update (GSF_stats, 819 peerreq->kill_task = GNUNET_SCHEDULER_add_now (&peer_request_destroy,
691 gettext_noop ("# P2P searches active"), 820 peerreq);
692 -1,
693 GNUNET_NO);
694 GNUNET_break (GNUNET_OK ==
695 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
696 &prd->query,
697 pr));
698 GSF_pending_request_cancel_ (pr);
699} 821}
700 822
701 823
@@ -844,6 +966,7 @@ struct GSF_PendingRequest *
844GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 966GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
845 const struct GNUNET_MessageHeader *message) 967 const struct GNUNET_MessageHeader *message)
846{ 968{
969 struct PeerRequest *peerreq;
847 struct GSF_PendingRequest *pr; 970 struct GSF_PendingRequest *pr;
848 struct GSF_PendingRequestData *prd; 971 struct GSF_PendingRequestData *prd;
849 struct GSF_ConnectedPeer *cp; 972 struct GSF_ConnectedPeer *cp;
@@ -1002,10 +1125,11 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1002 ttl -= ttl_decrement; 1125 ttl -= ttl_decrement;
1003 1126
1004 /* test if the request already exists */ 1127 /* test if the request already exists */
1005 pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map, 1128 peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
1006 &gm->query); 1129 &gm->query);
1007 if (pr != NULL) 1130 if (peerreq != NULL)
1008 { 1131 {
1132 pr = peerreq->pr;
1009 prd = GSF_pending_request_get_data_ (pr); 1133 prd = GSF_pending_request_get_data_ (pr);
1010 if ( (prd->type == type) && 1134 if ( (prd->type == type) &&
1011 ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) || 1135 ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
@@ -1034,10 +1158,15 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1034 GNUNET_assert (GNUNET_YES == 1158 GNUNET_assert (GNUNET_YES ==
1035 GNUNET_CONTAINER_multihashmap_remove (cp->request_map, 1159 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
1036 &gm->query, 1160 &gm->query,
1037 pr)); 1161 peerreq));
1162 if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1163 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1164 GNUNET_free (peerreq);
1038 } 1165 }
1039 } 1166 }
1040 1167
1168 peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1169 peerreq->cp = cp;
1041 pr = GSF_pending_request_create_ (options, 1170 pr = GSF_pending_request_create_ (options,
1042 type, 1171 type,
1043 &gm->query, 1172 &gm->query,
@@ -1052,11 +1181,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1052 spid, 1181 spid,
1053 NULL, 0, /* replies_seen */ 1182 NULL, 0, /* replies_seen */
1054 &handle_p2p_reply, 1183 &handle_p2p_reply,
1055 cp); 1184 peerreq);
1185 peerreq->pr = pr;
1056 GNUNET_break (GNUNET_OK == 1186 GNUNET_break (GNUNET_OK ==
1057 GNUNET_CONTAINER_multihashmap_put (cp->request_map, 1187 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1058 &gm->query, 1188 &gm->query,
1059 pr, 1189 peerreq,
1060 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 1190 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1061 GNUNET_STATISTICS_update (GSF_stats, 1191 GNUNET_STATISTICS_update (GSF_stats,
1062 gettext_noop ("# P2P searches received"), 1192 gettext_noop ("# P2P searches received"),
@@ -1131,9 +1261,6 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1131 struct GSF_PeerTransmitHandle *pth; 1261 struct GSF_PeerTransmitHandle *pth;
1132 struct GSF_PeerTransmitHandle *pos; 1262 struct GSF_PeerTransmitHandle *pos;
1133 struct GSF_PeerTransmitHandle *prev; 1263 struct GSF_PeerTransmitHandle *prev;
1134 struct GNUNET_PeerIdentity target;
1135 uint64_t ip;
1136 int is_ready;
1137 1264
1138 pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle)); 1265 pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1139 pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); 1266 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
@@ -1162,79 +1289,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1162 cp->pth_tail, 1289 cp->pth_tail,
1163 prev, 1290 prev,
1164 pth); 1291 pth);
1165 GNUNET_PEER_resolve (cp->ppd.pid,
1166 &target);
1167 if (GNUNET_YES == is_query) 1292 if (GNUNET_YES == is_query)
1168 { 1293 cp->ppd.pending_queries++;
1169 /* query, need reservation */
1170 cp->ppd.pending_queries++;
1171 if (GNUNET_YES == cp->did_reserve)
1172 {
1173 cp->did_reserve = GNUNET_NO;
1174 /* reservation already done! */
1175 is_ready = GNUNET_YES;
1176 ip = cp->inc_preference;
1177 cp->inc_preference = 0;
1178 cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
1179 &target,
1180 GNUNET_TIME_UNIT_FOREVER_REL,
1181 GNUNET_BANDWIDTH_VALUE_MAX,
1182 DBLOCK_SIZE,
1183 ip,
1184 &core_reserve_callback,
1185 cp);
1186 }
1187 else
1188 {
1189 /* still waiting for reservation */
1190 is_ready = GNUNET_NO;
1191 }
1192 }
1193 else if (GNUNET_NO == is_query) 1294 else if (GNUNET_NO == is_query)
1194 { 1295 cp->ppd.pending_replies++;
1195 /* no reservation needed for content */ 1296
1196 cp->ppd.pending_replies++; 1297 pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1197 is_ready = GNUNET_YES; 1298 &peer_transmit_timeout,
1198 }
1199 else
1200 {
1201 /* not a query or content, no reservation needed */
1202 is_ready = GNUNET_YES;
1203 }
1204 if (is_ready)
1205 {
1206 pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
1207 GNUNET_YES,
1208 priority,
1209 timeout,
1210 &target,
1211 size,
1212 &peer_transmit_ready_cb,
1213 pth); 1299 pth);
1214 /* pth->cth could be NULL here, that's OK, we'll try again 1300 schedule_transmission (pth);
1215 later... */
1216 }
1217 else
1218 {
1219#if DEBUG_FS
1220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221 "Not ready to ask for transmission to `%s'\n",
1222 GNUNET_i2s (&target));
1223#endif
1224 }
1225 if (pth->cth == NULL)
1226 {
1227#if DEBUG_FS
1228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1229 "No transmission task scheduled, creating timeout task (%llu ms)\n",
1230 (unsigned long long) timeout.rel_value);
1231#endif
1232 /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
1233 install a timeout task to be on the safe side */
1234 pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1235 &peer_transmit_timeout,
1236 pth);
1237 }
1238 return pth; 1301 return pth;
1239} 1302}
1240 1303
@@ -1364,9 +1427,13 @@ cancel_pending_request (void *cls,
1364 const GNUNET_HashCode *query, 1427 const GNUNET_HashCode *query,
1365 void *value) 1428 void *value)
1366{ 1429{
1367 struct GSF_PendingRequest *pr = value; 1430 struct PeerRequest *peerreq = cls;
1431 struct GSF_PendingRequest *pr = peerreq->pr;
1368 1432
1369 GSF_pending_request_cancel_ (pr); 1433 GSF_pending_request_cancel_ (pr);
1434 if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1435 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1436 GNUNET_free (peerreq);
1370 return GNUNET_OK; 1437 return GNUNET_OK;
1371} 1438}
1372 1439
diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c
index 58a1a0933..15dd4897a 100644
--- a/src/fs/gnunet-service-fs_lc.c
+++ b/src/fs/gnunet-service-fs_lc.c
@@ -58,6 +58,11 @@ struct ClientRequest
58 */ 58 */
59 struct GSF_LocalClient *lc; 59 struct GSF_LocalClient *lc;
60 60
61 /**
62 * Task scheduled to destroy the request.
63 */
64 GNUNET_SCHEDULER_TaskIdentifier kill_task;
65
61}; 66};
62 67
63 68
@@ -180,6 +185,33 @@ GSF_local_client_lookup_ (struct GNUNET_SERVER_Client *client)
180 185
181 186
182/** 187/**
188 * Free the given client request.
189 *
190 * @param cls the client request to free
191 * @param tc task context
192 */
193static void
194client_request_destroy (void *cls,
195 const struct GNUNET_SCHEDULER_TaskContext *tc)
196{
197 struct ClientRequest *cr = cls;
198 struct GSF_LocalClient *lc;
199
200 cr->kill_task = GNUNET_SCHEDULER_NO_TASK;
201 lc = cr->lc;
202 GNUNET_CONTAINER_DLL_remove (lc->cr_head,
203 lc->cr_tail,
204 cr);
205 GSF_pending_request_cancel_ (cr->pr);
206 GNUNET_STATISTICS_update (GSF_stats,
207 gettext_noop ("# client searches active"),
208 - 1,
209 GNUNET_NO);
210 GNUNET_free (cr);
211}
212
213
214/**
183 * Handle a reply to a pending request. Also called if a request 215 * Handle a reply to a pending request. Also called if a request
184 * expires (then with data == NULL). The handler may be called 216 * expires (then with data == NULL). The handler may be called
185 * many times (depending on the request type), but will not be 217 * many times (depending on the request type), but will not be
@@ -246,16 +278,8 @@ client_response_handler (void *cls,
246#endif 278#endif
247 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) 279 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
248 return; 280 return;
249 GNUNET_CONTAINER_DLL_remove (lc->cr_head, 281 cr->kill_task = GNUNET_SCHEDULER_add_now (&client_request_destroy,
250 lc->cr_tail, 282 cr);
251 cr);
252 GSF_pending_request_cancel_ (cr->pr);
253 GNUNET_STATISTICS_update (GSF_stats,
254 gettext_noop ("# client searches active"),
255 - 1,
256 GNUNET_NO);
257 GNUNET_free (cr);
258
259} 283}
260 284
261 285
@@ -489,6 +513,8 @@ GSF_client_disconnect_handler_ (void *cls,
489 gettext_noop ("# client searches active"), 513 gettext_noop ("# client searches active"),
490 - 1, 514 - 1,
491 GNUNET_NO); 515 GNUNET_NO);
516 if (GNUNET_SCHEDULER_NO_TASK != cr->kill_task)
517 GNUNET_SCHEDULER_cancel (cr->kill_task);
492 GNUNET_free (cr); 518 GNUNET_free (cr);
493 } 519 }
494 while (NULL != (res = pos->res_head)) 520 while (NULL != (res = pos->res_head))
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index b3f46cf45..28036150f 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -121,6 +121,17 @@ static struct GNUNET_CONTAINER_MultiHashMap *plans;
121 121
122 122
123/** 123/**
124 * Figure out when and how to transmit to the given peer.
125 *
126 * @param cls the 'struct GSF_ConnectedPeer' for transmission
127 * @param tc scheduler context
128 */
129static void
130schedule_peer_transmission (void *cls,
131 const struct GNUNET_SCHEDULER_TaskContext *tc);
132
133
134/**
124 * Insert the given request plan into the heap with the appropriate weight. 135 * Insert the given request plan into the heap with the appropriate weight.
125 * 136 *
126 * @param pp associated peer's plan 137 * @param pp associated peer's plan
@@ -156,21 +167,13 @@ plan (struct PeerPlan *pp,
156 rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap, 167 rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
157 rp, 168 rp,
158 rp->earliest_transmission.abs_value); 169 rp->earliest_transmission.abs_value);
170 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
171 GNUNET_SCHEDULER_cancel (pp->task);
172 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
159} 173}
160 174
161 175
162/** 176/**
163 * Figure out when and how to transmit to the given peer.
164 *
165 * @param cls the 'struct GSF_ConnectedPeer' for transmission
166 * @param tc scheduler context
167 */
168static void
169schedule_peer_transmission (void *cls,
170 const struct GNUNET_SCHEDULER_TaskContext *tc);
171
172
173/**
174 * Function called to get a message for transmission. 177 * Function called to get a message for transmission.
175 * 178 *
176 * @param cls closure 179 * @param cls closure
@@ -238,7 +241,11 @@ schedule_peer_transmission (void *cls,
238 size_t msize; 241 size_t msize;
239 242
240 pp->task = GNUNET_SCHEDULER_NO_TASK; 243 pp->task = GNUNET_SCHEDULER_NO_TASK;
241 GNUNET_assert (NULL == pp->pth); 244 if (pp->pth != NULL)
245 {
246 GSF_peer_transmit_cancel_ (pp->pth);
247 pp->pth = NULL;
248 }
242 /* move ready requests to priority queue */ 249 /* move ready requests to priority queue */
243 while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && 250 while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
244 (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) ) 251 (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
@@ -253,7 +260,20 @@ schedule_peer_transmission (void *cls,
253 /* priority heap (still) empty, check for delay... */ 260 /* priority heap (still) empty, check for delay... */
254 rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap); 261 rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
255 if (NULL == rp) 262 if (NULL == rp)
256 return; /* both queues empty */ 263 {
264#if DEBUG_FS
265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266 "No active requests for plan %p.\n",
267 pp);
268#endif
269 return; /* both queues empty */
270 }
271#if DEBUG_FS
272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
273 "Sleeping for %llu ms before retrying requests on plan %p.\n",
274 (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
275 pp);
276#endif
257 pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), 277 pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
258 &schedule_peer_transmission, 278 &schedule_peer_transmission,
259 pp); 279 pp);
@@ -267,7 +287,7 @@ schedule_peer_transmission (void *cls,
267 rp); 287 rp);
268#endif 288#endif
269 GNUNET_assert (NULL != rp); 289 GNUNET_assert (NULL != rp);
270 msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); 290 msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
271 pp->pth = GSF_peer_transmit_ (pp->cp, 291 pp->pth = GSF_peer_transmit_ (pp->cp,
272 GNUNET_YES, 292 GNUNET_YES,
273 rp->priority, 293 rp->priority,
@@ -322,31 +342,6 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
322 prd->rp_tail, 342 prd->rp_tail,
323 rp); 343 rp);
324 plan (pp, rp); 344 plan (pp, rp);
325 if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
326 {
327 /* no request that should be done immediately, figure out delay */
328 if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
329 return; /* did not change delay heap top, no need to do anything */
330 GNUNET_assert (NULL == pp->pth);
331 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
332 GNUNET_SCHEDULER_cancel (pp->task);
333 pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
334 &schedule_peer_transmission,
335 pp);
336 return;
337 }
338
339 if (pp->pth != NULL)
340 {
341 if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
342 return; /* did not change priority heap top, no need to do anyhing */
343 GSF_peer_transmit_cancel_ (pp->pth);
344 pp->pth = NULL;
345 }
346 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
347 GNUNET_SCHEDULER_cancel (pp->task);
348 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
349 pp);
350} 345}
351 346
352 347
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index f327d9b4b..7406bed0f 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -1008,7 +1008,7 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1008 */ 1008 */
1009static void 1009static void
1010process_local_reply (void *cls, 1010process_local_reply (void *cls,
1011 const GNUNET_HashCode * key, 1011 const GNUNET_HashCode *key,
1012 size_t size, 1012 size_t size,
1013 const void *data, 1013 const void *data,
1014 enum GNUNET_BLOCK_Type type, 1014 enum GNUNET_BLOCK_Type type,
diff --git a/src/fs/test_gnunet_service_fs_p2p.c b/src/fs/test_gnunet_service_fs_p2p.c
index 3bb808c48..91cfda0eb 100644
--- a/src/fs/test_gnunet_service_fs_p2p.c
+++ b/src/fs/test_gnunet_service_fs_p2p.c
@@ -34,9 +34,9 @@
34#define FILESIZE (1024 * 1024 * 2) 34#define FILESIZE (1024 * 1024 * 2)
35 35
36/** 36/**
37 * How long until we give up on transmitting the message? 37 * How long until we give up on the download?
38 */ 38 */
39#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 600) 39#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
40 40
41#define NUM_DAEMONS 2 41#define NUM_DAEMONS 2
42 42