aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-04-21 15:14:14 +0000
committerChristian Grothoff <christian@grothoff.org>2011-04-21 15:14:14 +0000
commit41700b6cb0345080513d82bdbe2dbe33c8654491 (patch)
treefeab3a7f27cdadd01897239c86cbdd137da55b8f /src/fs/gnunet-service-fs_cp.c
parentbbf3c4e04289295df264539e1017f3778f45f97f (diff)
downloadgnunet-41700b6cb0345080513d82bdbe2dbe33c8654491.tar.gz
gnunet-41700b6cb0345080513d82bdbe2dbe33c8654491.zip
wip
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c299
1 files changed, 183 insertions, 116 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 5aba83298..2522cbe7b 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -105,6 +105,11 @@ struct GSF_PeerTransmitHandle
105 int is_query; 105 int is_query;
106 106
107 /** 107 /**
108 * Did we get a reservation already?
109 */
110 int was_reserved;
111
112 /**
108 * Priority of this request. 113 * Priority of this request.
109 */ 114 */
110 uint32_t priority; 115 uint32_t priority;
@@ -113,6 +118,30 @@ struct GSF_PeerTransmitHandle
113 118
114 119
115/** 120/**
121 * Information per peer and request.
122 */
123struct PeerRequest
124{
125
126 /**
127 * Handle to generic request.
128 */
129 struct GSF_PendingRequest *pr;
130
131 /**
132 * Handle to specific peer.
133 */
134 struct GSF_ConnectedPeer *cp;
135
136 /**
137 * Task for asynchronous stopping of this request.
138 */
139 GNUNET_SCHEDULER_TaskIdentifier kill_task;
140
141};
142
143
144/**
116 * A connected peer. 145 * A connected peer.
117 */ 146 */
118struct GSF_ConnectedPeer 147struct GSF_ConnectedPeer
@@ -162,7 +191,7 @@ struct GSF_ConnectedPeer
162 GNUNET_SCHEDULER_TaskIdentifier irc_delay_task; 191 GNUNET_SCHEDULER_TaskIdentifier irc_delay_task;
163 192
164 /** 193 /**
165 * Active requests from this neighbour. 194 * Active requests from this neighbour, map of query to 'struct PeerRequest'.
166 */ 195 */
167 struct GNUNET_CONTAINER_MultiHashMap *request_map; 196 struct GNUNET_CONTAINER_MultiHashMap *request_map;
168 197
@@ -299,12 +328,99 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
299static size_t 328static size_t
300peer_transmit_ready_cb (void *cls, 329peer_transmit_ready_cb (void *cls,
301 size_t size, 330 size_t size,
331 void *buf);
332
333
334
335
336/**
337 * Function called by core upon success or failure of our bandwidth reservation request.
338 *
339 * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
340 * @param peer identifies the peer
341 * @param bandwidth_out available amount of outbound bandwidth
342 * @param amount set to the amount that was actually reserved or unreserved;
343 * either the full requested amount or zero (no partial reservations)
344 * @param res_delay if the reservation could not be satisfied (amount was 0), how
345 * long should the client wait until re-trying?
346 * @param preference current traffic preference for the given peer
347 */
348static void
349core_reserve_callback (void *cls,
350 const struct GNUNET_PeerIdentity *peer,
351 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
352 int32_t amount,
353 struct GNUNET_TIME_Relative res_delay,
354 uint64_t preference);
355
356
357/**
358 * If ready (bandwidth reserved), try to schedule transmission via
359 * core for the given handle.
360 *
361 * @param pth transmission handle to schedule
362 */
363static void
364schedule_transmission (struct GSF_PeerTransmitHandle *pth)
365{
366 struct GSF_ConnectedPeer *cp;
367 struct GNUNET_PeerIdentity target;
368 uint64_t ip;
369
370 if (NULL != pth->cth)
371 return; /* already done */
372 cp = pth->cp;
373 GNUNET_PEER_resolve (cp->ppd.pid,
374 &target);
375 if ( (GNUNET_YES == pth->is_query) &&
376 (GNUNET_YES != pth->was_reserved) )
377 {
378 /* query, need reservation */
379 if (GNUNET_YES != cp->did_reserve)
380 return; /* not ready */
381 cp->did_reserve = GNUNET_NO;
382 /* reservation already done! */
383 pth->was_reserved = GNUNET_YES;
384 ip = cp->inc_preference;
385 cp->inc_preference = 0;
386 cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
387 &target,
388 GNUNET_TIME_UNIT_FOREVER_REL,
389 GNUNET_BANDWIDTH_VALUE_MAX,
390 DBLOCK_SIZE,
391 ip,
392 &core_reserve_callback,
393 cp);
394 }
395 pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
396 GNUNET_YES,
397 pth->priority,
398 GNUNET_TIME_absolute_get_remaining (pth->timeout),
399 &target,
400 pth->size,
401 &peer_transmit_ready_cb,
402 pth);
403}
404
405
406/**
407 * Core is ready to transmit to a peer, get the message.
408 *
409 * @param cls the 'struct GSF_PeerTransmitHandle' of the message
410 * @param size number of bytes core is willing to take
411 * @param buf where to copy the message
412 * @return number of bytes copied to buf
413 */
414static size_t
415peer_transmit_ready_cb (void *cls,
416 size_t size,
302 void *buf) 417 void *buf)
303{ 418{
304 struct GSF_PeerTransmitHandle *pth = cls; 419 struct GSF_PeerTransmitHandle *pth = cls;
305 struct GSF_ConnectedPeer *cp; 420 struct GSF_ConnectedPeer *cp;
306 size_t ret; 421 size_t ret;
307 422
423 pth->cth = NULL;
308 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK) 424 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
309 { 425 {
310 GNUNET_SCHEDULER_cancel (pth->timeout_task); 426 GNUNET_SCHEDULER_cancel (pth->timeout_task);
@@ -327,33 +443,14 @@ peer_transmit_ready_cb (void *cls,
327 GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value); 443 GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);
328 ret = pth->gmc (pth->gmc_cls, 444 ret = pth->gmc (pth->gmc_cls,
329 size, buf); 445 size, buf);
330 GNUNET_free (pth); 446 GNUNET_free (pth);
447 for (pth = cp->pth_head; pth != NULL; pth = pth->next)
448 schedule_transmission (pth);
331 return ret; 449 return ret;
332} 450}
333 451
334 452
335/** 453/**
336 * Function called by core upon success or failure of our bandwidth reservation request.
337 *
338 * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
339 * @param peer identifies the peer
340 * @param bandwidth_out available amount of outbound bandwidth
341 * @param amount set to the amount that was actually reserved or unreserved;
342 * either the full requested amount or zero (no partial reservations)
343 * @param res_delay if the reservation could not be satisfied (amount was 0), how
344 * long should the client wait until re-trying?
345 * @param preference current traffic preference for the given peer
346 */
347static void
348core_reserve_callback (void *cls,
349 const struct GNUNET_PeerIdentity *peer,
350 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
351 int32_t amount,
352 struct GNUNET_TIME_Relative res_delay,
353 uint64_t preference);
354
355
356/**
357 * (re)try to reserve bandwidth from the given peer. 454 * (re)try to reserve bandwidth from the given peer.
358 * 455 *
359 * @param cls the 'struct GSF_ConnectedPeer' to reserve from 456 * @param cls the 'struct GSF_ConnectedPeer' to reserve from
@@ -607,6 +704,36 @@ copy_reply (void *cls,
607 704
608 705
609/** 706/**
707 * Free the given client request.
708 *
709 * @param cls the client request to free
710 * @param tc task context
711 */
712static void
713peer_request_destroy (void *cls,
714 const struct GNUNET_SCHEDULER_TaskContext *tc)
715{
716 struct PeerRequest *peerreq = cls;
717 struct GSF_PendingRequest *pr = peerreq->pr;
718 struct GSF_ConnectedPeer *cp = peerreq->cp;
719 struct GSF_PendingRequestData *prd;
720
721 peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
722 prd = GSF_pending_request_get_data_ (pr);
723 GNUNET_STATISTICS_update (GSF_stats,
724 gettext_noop ("# P2P searches active"),
725 -1,
726 GNUNET_NO);
727 GNUNET_break (GNUNET_OK ==
728 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
729 &prd->query,
730 peerreq));
731 GSF_pending_request_cancel_ (pr);
732 GNUNET_free (peerreq);
733}
734
735
736/**
610 * Handle a reply to a pending request. Also called if a request 737 * Handle a reply to a pending request. Also called if a request
611 * expires (then with data == NULL). The handler may be called 738 * expires (then with data == NULL). The handler may be called
612 * many times (depending on the request type), but will not be 739 * many times (depending on the request type), but will not be
@@ -614,8 +741,7 @@ copy_reply (void *cls,
614 * and will also not be called anymore after a call signalling 741 * and will also not be called anymore after a call signalling
615 * expiration. 742 * expiration.
616 * 743 *
617 * @param cls 'struct GSF_ConnectedPeer' of the peer that would 744 * @param cls 'struct PeerRequest' this is an answer for
618 * have liked an answer to the request
619 * @param eval evaluation of the result 745 * @param eval evaluation of the result
620 * @param pr handle to the original pending request 746 * @param pr handle to the original pending request
621 * @param expiration when does 'data' expire? 747 * @param expiration when does 'data' expire?
@@ -632,12 +758,14 @@ handle_p2p_reply (void *cls,
632 const void *data, 758 const void *data,
633 size_t data_len) 759 size_t data_len)
634{ 760{
635 struct GSF_ConnectedPeer *cp = cls; 761 struct PeerRequest *peerreq = cls;
762 struct GSF_ConnectedPeer *cp = peerreq->cp;
636 struct GSF_PendingRequestData *prd; 763 struct GSF_PendingRequestData *prd;
637 struct PutMessage *pm; 764 struct PutMessage *pm;
638 size_t msize; 765 size_t msize;
639 766
640 GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE); 767 GNUNET_assert (data_len < GNUNET_SERVER_MAX_MESSAGE_SIZE);
768 GNUNET_assert (peerreq->pr == pr);
641 prd = GSF_pending_request_get_data_ (pr); 769 prd = GSF_pending_request_get_data_ (pr);
642 if (NULL == data) 770 if (NULL == data)
643 { 771 {
@@ -648,7 +776,8 @@ handle_p2p_reply (void *cls,
648 GNUNET_break (GNUNET_OK == 776 GNUNET_break (GNUNET_OK ==
649 GNUNET_CONTAINER_multihashmap_remove (cp->request_map, 777 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
650 &prd->query, 778 &prd->query,
651 pr)); 779 peerreq));
780 GNUNET_free (peerreq);
652 return; 781 return;
653 } 782 }
654 GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY); 783 GNUNET_break (type != GNUNET_BLOCK_TYPE_ANY);
@@ -687,15 +816,8 @@ handle_p2p_reply (void *cls,
687 pm); 816 pm);
688 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) 817 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
689 return; 818 return;
690 GNUNET_STATISTICS_update (GSF_stats, 819 peerreq->kill_task = GNUNET_SCHEDULER_add_now (&peer_request_destroy,
691 gettext_noop ("# P2P searches active"), 820 peerreq);
692 -1,
693 GNUNET_NO);
694 GNUNET_break (GNUNET_OK ==
695 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
696 &prd->query,
697 pr));
698 GSF_pending_request_cancel_ (pr);
699} 821}
700 822
701 823
@@ -844,6 +966,7 @@ struct GSF_PendingRequest *
844GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 966GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
845 const struct GNUNET_MessageHeader *message) 967 const struct GNUNET_MessageHeader *message)
846{ 968{
969 struct PeerRequest *peerreq;
847 struct GSF_PendingRequest *pr; 970 struct GSF_PendingRequest *pr;
848 struct GSF_PendingRequestData *prd; 971 struct GSF_PendingRequestData *prd;
849 struct GSF_ConnectedPeer *cp; 972 struct GSF_ConnectedPeer *cp;
@@ -1002,10 +1125,11 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1002 ttl -= ttl_decrement; 1125 ttl -= ttl_decrement;
1003 1126
1004 /* test if the request already exists */ 1127 /* test if the request already exists */
1005 pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map, 1128 peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
1006 &gm->query); 1129 &gm->query);
1007 if (pr != NULL) 1130 if (peerreq != NULL)
1008 { 1131 {
1132 pr = peerreq->pr;
1009 prd = GSF_pending_request_get_data_ (pr); 1133 prd = GSF_pending_request_get_data_ (pr);
1010 if ( (prd->type == type) && 1134 if ( (prd->type == type) &&
1011 ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) || 1135 ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
@@ -1034,10 +1158,15 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1034 GNUNET_assert (GNUNET_YES == 1158 GNUNET_assert (GNUNET_YES ==
1035 GNUNET_CONTAINER_multihashmap_remove (cp->request_map, 1159 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
1036 &gm->query, 1160 &gm->query,
1037 pr)); 1161 peerreq));
1162 if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1163 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1164 GNUNET_free (peerreq);
1038 } 1165 }
1039 } 1166 }
1040 1167
1168 peerreq = GNUNET_malloc (sizeof (struct PeerRequest));
1169 peerreq->cp = cp;
1041 pr = GSF_pending_request_create_ (options, 1170 pr = GSF_pending_request_create_ (options,
1042 type, 1171 type,
1043 &gm->query, 1172 &gm->query,
@@ -1052,11 +1181,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1052 spid, 1181 spid,
1053 NULL, 0, /* replies_seen */ 1182 NULL, 0, /* replies_seen */
1054 &handle_p2p_reply, 1183 &handle_p2p_reply,
1055 cp); 1184 peerreq);
1185 peerreq->pr = pr;
1056 GNUNET_break (GNUNET_OK == 1186 GNUNET_break (GNUNET_OK ==
1057 GNUNET_CONTAINER_multihashmap_put (cp->request_map, 1187 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1058 &gm->query, 1188 &gm->query,
1059 pr, 1189 peerreq,
1060 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 1190 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1061 GNUNET_STATISTICS_update (GSF_stats, 1191 GNUNET_STATISTICS_update (GSF_stats,
1062 gettext_noop ("# P2P searches received"), 1192 gettext_noop ("# P2P searches received"),
@@ -1131,9 +1261,6 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1131 struct GSF_PeerTransmitHandle *pth; 1261 struct GSF_PeerTransmitHandle *pth;
1132 struct GSF_PeerTransmitHandle *pos; 1262 struct GSF_PeerTransmitHandle *pos;
1133 struct GSF_PeerTransmitHandle *prev; 1263 struct GSF_PeerTransmitHandle *prev;
1134 struct GNUNET_PeerIdentity target;
1135 uint64_t ip;
1136 int is_ready;
1137 1264
1138 pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle)); 1265 pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
1139 pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); 1266 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
@@ -1162,79 +1289,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1162 cp->pth_tail, 1289 cp->pth_tail,
1163 prev, 1290 prev,
1164 pth); 1291 pth);
1165 GNUNET_PEER_resolve (cp->ppd.pid,
1166 &target);
1167 if (GNUNET_YES == is_query) 1292 if (GNUNET_YES == is_query)
1168 { 1293 cp->ppd.pending_queries++;
1169 /* query, need reservation */
1170 cp->ppd.pending_queries++;
1171 if (GNUNET_YES == cp->did_reserve)
1172 {
1173 cp->did_reserve = GNUNET_NO;
1174 /* reservation already done! */
1175 is_ready = GNUNET_YES;
1176 ip = cp->inc_preference;
1177 cp->inc_preference = 0;
1178 cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
1179 &target,
1180 GNUNET_TIME_UNIT_FOREVER_REL,
1181 GNUNET_BANDWIDTH_VALUE_MAX,
1182 DBLOCK_SIZE,
1183 ip,
1184 &core_reserve_callback,
1185 cp);
1186 }
1187 else
1188 {
1189 /* still waiting for reservation */
1190 is_ready = GNUNET_NO;
1191 }
1192 }
1193 else if (GNUNET_NO == is_query) 1294 else if (GNUNET_NO == is_query)
1194 { 1295 cp->ppd.pending_replies++;
1195 /* no reservation needed for content */ 1296
1196 cp->ppd.pending_replies++; 1297 pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1197 is_ready = GNUNET_YES; 1298 &peer_transmit_timeout,
1198 }
1199 else
1200 {
1201 /* not a query or content, no reservation needed */
1202 is_ready = GNUNET_YES;
1203 }
1204 if (is_ready)
1205 {
1206 pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
1207 GNUNET_YES,
1208 priority,
1209 timeout,
1210 &target,
1211 size,
1212 &peer_transmit_ready_cb,
1213 pth); 1299 pth);
1214 /* pth->cth could be NULL here, that's OK, we'll try again 1300 schedule_transmission (pth);
1215 later... */
1216 }
1217 else
1218 {
1219#if DEBUG_FS
1220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221 "Not ready to ask for transmission to `%s'\n",
1222 GNUNET_i2s (&target));
1223#endif
1224 }
1225 if (pth->cth == NULL)
1226 {
1227#if DEBUG_FS
1228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1229 "No transmission task scheduled, creating timeout task (%llu ms)\n",
1230 (unsigned long long) timeout.rel_value);
1231#endif
1232 /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
1233 install a timeout task to be on the safe side */
1234 pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
1235 &peer_transmit_timeout,
1236 pth);
1237 }
1238 return pth; 1301 return pth;
1239} 1302}
1240 1303
@@ -1364,9 +1427,13 @@ cancel_pending_request (void *cls,
1364 const GNUNET_HashCode *query, 1427 const GNUNET_HashCode *query,
1365 void *value) 1428 void *value)
1366{ 1429{
1367 struct GSF_PendingRequest *pr = value; 1430 struct PeerRequest *peerreq = cls;
1431 struct GSF_PendingRequest *pr = peerreq->pr;
1368 1432
1369 GSF_pending_request_cancel_ (pr); 1433 GSF_pending_request_cancel_ (pr);
1434 if (peerreq->kill_task != GNUNET_SCHEDULER_NO_TASK)
1435 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
1436 GNUNET_free (peerreq);
1370 return GNUNET_OK; 1437 return GNUNET_OK;
1371} 1438}
1372 1439