aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-03-01 01:12:22 +0000
committerChristian Grothoff <christian@grothoff.org>2015-03-01 01:12:22 +0000
commitedc0456b8659fdf6c8724aa5da339442b9e9d275 (patch)
treea66c2d51d17e0ac71d72f11a9b5fe544af1cd562
parent3a2737d76679c68331fad0be0b89d8efdcde5079 (diff)
downloadgnunet-edc0456b8659fdf6c8724aa5da339442b9e9d275.tar.gz
gnunet-edc0456b8659fdf6c8724aa5da339442b9e9d275.zip
count number of pending replies and refuse to process queries if queue gets too big
-rw-r--r--src/fs/gnunet-service-fs_cp.c144
1 files changed, 105 insertions, 39 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 749ef15c7..eff8286fc 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -86,7 +86,7 @@ struct GSF_PeerTransmitHandle
86 /** 86 /**
87 * Task called on timeout, or 0 for none. 87 * Task called on timeout, or 0 for none.
88 */ 88 */
89 struct GNUNET_SCHEDULER_Task * timeout_task; 89 struct GNUNET_SCHEDULER_Task *timeout_task;
90 90
91 /** 91 /**
92 * Function to call to get the actual message. 92 * Function to call to get the actual message.
@@ -155,7 +155,7 @@ struct GSF_DelayedHandle
155 /** 155 /**
156 * Task for the delay. 156 * Task for the delay.
157 */ 157 */
158 struct GNUNET_SCHEDULER_Task * delay_task; 158 struct GNUNET_SCHEDULER_Task *delay_task;
159 159
160 /** 160 /**
161 * Size of the message. 161 * Size of the message.
@@ -184,7 +184,7 @@ struct PeerRequest
184 /** 184 /**
185 * Task for asynchronous stopping of this request. 185 * Task for asynchronous stopping of this request.
186 */ 186 */
187 struct GNUNET_SCHEDULER_Task * kill_task; 187 struct GNUNET_SCHEDULER_Task *kill_task;
188 188
189}; 189};
190 190
@@ -209,7 +209,7 @@ struct GSF_ConnectedPeer
209 /** 209 /**
210 * Task scheduled to revive migration to this peer. 210 * Task scheduled to revive migration to this peer.
211 */ 211 */
212 struct GNUNET_SCHEDULER_Task * mig_revive_task; 212 struct GNUNET_SCHEDULER_Task *mig_revive_task;
213 213
214 /** 214 /**
215 * Messages (replies, queries, content migration) we would like to 215 * Messages (replies, queries, content migration) we would like to
@@ -248,7 +248,7 @@ struct GSF_ConnectedPeer
248 /** 248 /**
249 * Task scheduled if we need to retry bandwidth reservation later. 249 * Task scheduled if we need to retry bandwidth reservation later.
250 */ 250 */
251 struct GNUNET_SCHEDULER_Task * rc_delay_task; 251 struct GNUNET_SCHEDULER_Task *rc_delay_task;
252 252
253 /** 253 /**
254 * Active requests from this neighbour, map of query to 'struct PeerRequest'. 254 * Active requests from this neighbour, map of query to 'struct PeerRequest'.
@@ -276,6 +276,11 @@ struct GSF_ConnectedPeer
276 unsigned int cth_in_progress; 276 unsigned int cth_in_progress;
277 277
278 /** 278 /**
279 * Number of entries in @e delayed_head DLL.
280 */
281 unsigned int delay_queue_size;
282
283 /**
279 * Respect rating for this peer on disk. 284 * Respect rating for this peer on disk.
280 */ 285 */
281 uint32_t disk_respect; 286 uint32_t disk_respect;
@@ -298,8 +303,8 @@ struct GSF_ConnectedPeer
298 unsigned int last_request_times_off; 303 unsigned int last_request_times_off;
299 304
300 /** 305 /**
301 * GNUNET_YES if we did successfully reserve 32k bandwidth, 306 * #GNUNET_YES if we did successfully reserve 32k bandwidth,
302 * GNUNET_NO if not. 307 * #GNUNET_NO if not.
303 */ 308 */
304 int did_reserve; 309 int did_reserve;
305 310
@@ -439,10 +444,13 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
439 GNUNET_assert (NULL == cp->cth); 444 GNUNET_assert (NULL == cp->cth);
440 cp->cth_in_progress++; 445 cp->cth_in_progress++;
441 cp->cth = 446 cp->cth =
442 GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES, 447 GNUNET_CORE_notify_transmit_ready (GSF_core,
448 GNUNET_YES,
443 GNUNET_CORE_PRIO_BACKGROUND, 449 GNUNET_CORE_PRIO_BACKGROUND,
444 GNUNET_TIME_absolute_get_remaining 450 GNUNET_TIME_absolute_get_remaining
445 (pth->timeout), &target, pth->size, 451 (pth->timeout),
452 &target,
453 pth->size,
446 &peer_transmit_ready_cb, cp); 454 &peer_transmit_ready_cb, cp);
447 GNUNET_assert (NULL != cp->cth); 455 GNUNET_assert (NULL != cp->cth);
448 GNUNET_assert (0 < cp->cth_in_progress--); 456 GNUNET_assert (0 < cp->cth_in_progress--);
@@ -458,7 +466,9 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
458 * @return number of bytes copied to @a buf 466 * @return number of bytes copied to @a buf
459 */ 467 */
460static size_t 468static size_t
461peer_transmit_ready_cb (void *cls, size_t size, void *buf) 469peer_transmit_ready_cb (void *cls,
470 size_t size,
471 void *buf)
462{ 472{
463 struct GSF_ConnectedPeer *cp = cls; 473 struct GSF_ConnectedPeer *cp = cls;
464 struct GSF_PeerTransmitHandle *pth = cp->pth_head; 474 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
@@ -478,7 +488,9 @@ peer_transmit_ready_cb (void *cls, size_t size, void *buf)
478 GNUNET_SCHEDULER_cancel (pth->timeout_task); 488 GNUNET_SCHEDULER_cancel (pth->timeout_task);
479 pth->timeout_task = NULL; 489 pth->timeout_task = NULL;
480 } 490 }
481 GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); 491 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
492 cp->pth_tail,
493 pth);
482 if (GNUNET_YES == pth->is_query) 494 if (GNUNET_YES == pth->is_query)
483 { 495 {
484 cp->ppd.last_request_times[(cp->last_request_times_off++) % 496 cp->ppd.last_request_times[(cp->last_request_times_off++) %
@@ -511,7 +523,8 @@ peer_transmit_ready_cb (void *cls, size_t size, void *buf)
511 * @param tc scheduler context 523 * @param tc scheduler context
512 */ 524 */
513static void 525static void
514retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 526retry_reservation (void *cls,
527 const struct GNUNET_SCHEDULER_TaskContext *tc)
515{ 528{
516 struct GSF_ConnectedPeer *cp = cls; 529 struct GSF_ConnectedPeer *cp = cls;
517 struct GNUNET_PeerIdentity target; 530 struct GNUNET_PeerIdentity target;
@@ -519,7 +532,9 @@ retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
519 GNUNET_PEER_resolve (cp->ppd.pid, &target); 532 GNUNET_PEER_resolve (cp->ppd.pid, &target);
520 cp->rc_delay_task = NULL; 533 cp->rc_delay_task = NULL;
521 cp->rc = 534 cp->rc =
522 GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE, 535 GNUNET_ATS_reserve_bandwidth (GSF_ats,
536 &target,
537 DBLOCK_SIZE,
523 &ats_reserve_callback, cp); 538 &ats_reserve_callback, cp);
524} 539}
525 540
@@ -736,7 +751,9 @@ GSF_handle_p2p_migration_stop_ (void *cls,
736 * @return number of bytes copied to @a buf, can be 0 (without indicating an error) 751 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
737 */ 752 */
738static size_t 753static size_t
739copy_reply (void *cls, size_t buf_size, void *buf) 754copy_reply (void *cls,
755 size_t buf_size,
756 void *buf)
740{ 757{
741 struct PutMessage *pm = cls; 758 struct PutMessage *pm = cls;
742 size_t size; 759 size_t size;
@@ -845,15 +862,23 @@ transmit_delayed_now (void *cls,
845 struct GSF_DelayedHandle *dh = cls; 862 struct GSF_DelayedHandle *dh = cls;
846 struct GSF_ConnectedPeer *cp = dh->cp; 863 struct GSF_ConnectedPeer *cp = dh->cp;
847 864
848 GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh); 865 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
866 cp->delayed_tail,
867 dh);
868 cp->delay_queue_size--;
849 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) 869 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
850 { 870 {
851 GNUNET_free (dh->pm); 871 GNUNET_free (dh->pm);
852 GNUNET_free (dh); 872 GNUNET_free (dh);
853 return; 873 return;
854 } 874 }
855 (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, 875 (void) GSF_peer_transmit_ (cp,
856 dh->msize, &copy_reply, dh->pm); 876 GNUNET_NO,
877 UINT32_MAX,
878 REPLY_TIMEOUT,
879 dh->msize,
880 &copy_reply,
881 dh->pm);
857 GNUNET_free (dh); 882 GNUNET_free (dh);
858} 883}
859 884
@@ -967,8 +992,9 @@ handle_p2p_reply (void *cls,
967 pm->type = htonl (type); 992 pm->type = htonl (type);
968 pm->expiration = GNUNET_TIME_absolute_hton (expiration); 993 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
969 memcpy (&pm[1], data, data_len); 994 memcpy (&pm[1], data, data_len);
970 if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) && 995 if ( (UINT32_MAX != reply_anonymity_level) &&
971 (GNUNET_YES == GSF_enable_randomized_delays)) 996 (0 != reply_anonymity_level) &&
997 (GNUNET_YES == GSF_enable_randomized_delays) )
972 { 998 {
973 struct GSF_DelayedHandle *dh; 999 struct GSF_DelayedHandle *dh;
974 1000
@@ -976,15 +1002,24 @@ handle_p2p_reply (void *cls,
976 dh->cp = cp; 1002 dh->cp = cp;
977 dh->pm = pm; 1003 dh->pm = pm;
978 dh->msize = msize; 1004 dh->msize = msize;
979 GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh); 1005 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
1006 cp->delayed_tail,
1007 dh);
1008 cp->delay_queue_size++;
980 dh->delay_task = 1009 dh->delay_task =
981 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (), 1010 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
982 &transmit_delayed_now, dh); 1011 &transmit_delayed_now,
1012 dh);
983 } 1013 }
984 else 1014 else
985 { 1015 {
986 (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize, 1016 (void) GSF_peer_transmit_ (cp,
987 &copy_reply, pm); 1017 GNUNET_NO,
1018 UINT32_MAX,
1019 REPLY_TIMEOUT,
1020 msize,
1021 &copy_reply,
1022 pm);
988 } 1023 }
989 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) 1024 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
990 return; 1025 return;
@@ -1164,7 +1199,6 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1164 enum GNUNET_BLOCK_Type type; 1199 enum GNUNET_BLOCK_Type type;
1165 GNUNET_PEER_Id spid; 1200 GNUNET_PEER_Id spid;
1166 1201
1167 GNUNET_assert (other != NULL);
1168 msize = ntohs (message->size); 1202 msize = ntohs (message->size);
1169 if (msize < sizeof (struct GetMessage)) 1203 if (msize < sizeof (struct GetMessage))
1170 { 1204 {
@@ -1173,7 +1207,8 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1173 } 1207 }
1174 GNUNET_STATISTICS_update (GSF_stats, 1208 GNUNET_STATISTICS_update (GSF_stats,
1175 gettext_noop 1209 gettext_noop
1176 ("# GET requests received (from other peers)"), 1, 1210 ("# GET requests received (from other peers)"),
1211 1,
1177 GNUNET_NO); 1212 GNUNET_NO);
1178 gm = (const struct GetMessage *) message; 1213 gm = (const struct GetMessage *) message;
1179 type = ntohl (gm->type); 1214 type = ntohl (gm->type);
@@ -1219,25 +1254,36 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1219 { 1254 {
1220 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) 1255 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n", 1257 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1223 GNUNET_i2s (&opt[bits - 1])); 1258 GNUNET_i2s (&opt[bits - 1]));
1224 1259
1225 else 1260 else
1226 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1261 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227 "Failed to find peer `%4s' in connection set. Dropping query.\n", 1262 "Failed to find peer `%s' in connection set. Dropping query.\n",
1228 GNUNET_i2s (other)); 1263 GNUNET_i2s (other));
1229#if INSANE_STATISTICS
1230 GNUNET_STATISTICS_update (GSF_stats, 1264 GNUNET_STATISTICS_update (GSF_stats,
1231 gettext_noop 1265 gettext_noop
1232 ("# requests dropped due to missing reverse route"), 1266 ("# requests dropped due to missing reverse route"),
1233 1, GNUNET_NO); 1267 1,
1234#endif 1268 GNUNET_NO);
1269 return NULL;
1270 }
1271 if (cp->ppd.pending_replies + cp->delay_queue_size > 128)
1272 {
1273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1274 "Peer `%s' has too many replies queued already. Dropping query.\n",
1275 GNUNET_i2s (other));
1276 GNUNET_STATISTICS_update (GSF_stats,
1277 gettext_noop ("# requests dropped due to full reply queue"),
1278 1,
1279 GNUNET_NO);
1235 return NULL; 1280 return NULL;
1236 } 1281 }
1237 /* note that we can really only check load here since otherwise 1282 /* note that we can really only check load here since otherwise
1238 * peers could find out that we are overloaded by not being 1283 * peers could find out that we are overloaded by not being
1239 * disconnected after sending us a malformed query... */ 1284 * disconnected after sending us a malformed query... */
1240 priority = bound_priority (ntohl (gm->priority), cps); 1285 priority = bound_priority (ntohl (gm->priority),
1286 cps);
1241 if (priority < 0) 1287 if (priority < 0)
1242 { 1288 {
1243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1246,7 +1292,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1246 return NULL; 1292 return NULL;
1247 } 1293 }
1248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1249 "Received request for `%s' of type %u from peer `%4s' with flags %u\n", 1295 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1250 GNUNET_h2s (&gm->query), 1296 GNUNET_h2s (&gm->query),
1251 (unsigned int) type, 1297 (unsigned int) type,
1252 GNUNET_i2s (other), 1298 GNUNET_i2s (other),
@@ -1359,7 +1405,9 @@ peer_transmit_timeout (void *cls,
1359 "Timeout trying to transmit to other peer\n"); 1405 "Timeout trying to transmit to other peer\n");
1360 pth->timeout_task = NULL; 1406 pth->timeout_task = NULL;
1361 cp = pth->cp; 1407 cp = pth->cp;
1362 GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); 1408 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1409 cp->pth_tail,
1410 pth);
1363 if (GNUNET_YES == pth->is_query) 1411 if (GNUNET_YES == pth->is_query)
1364 GNUNET_assert (0 < cp->ppd.pending_queries--); 1412 GNUNET_assert (0 < cp->ppd.pending_queries--);
1365 else if (GNUNET_NO == pth->is_query) 1413 else if (GNUNET_NO == pth->is_query)
@@ -1419,13 +1467,18 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1419 prev = pos; 1467 prev = pos;
1420 pos = pos->next; 1468 pos = pos->next;
1421 } 1469 }
1422 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth); 1470 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1471 cp->pth_tail,
1472 prev,
1473 pth);
1423 if (GNUNET_YES == is_query) 1474 if (GNUNET_YES == is_query)
1424 cp->ppd.pending_queries++; 1475 cp->ppd.pending_queries++;
1425 else if (GNUNET_NO == is_query) 1476 else if (GNUNET_NO == is_query)
1426 cp->ppd.pending_replies++; 1477 cp->ppd.pending_replies++;
1427 pth->timeout_task = 1478 pth->timeout_task
1428 GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth); 1479 = GNUNET_SCHEDULER_add_delayed (timeout,
1480 &peer_transmit_timeout,
1481 pth);
1429 schedule_transmission (pth); 1482 schedule_transmission (pth);
1430 return pth; 1483 return pth;
1431} 1484}
@@ -1447,7 +1500,9 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1447 pth->timeout_task = NULL; 1500 pth->timeout_task = NULL;
1448 } 1501 }
1449 cp = pth->cp; 1502 cp = pth->cp;
1450 GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); 1503 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1504 cp->pth_tail,
1505 pth);
1451 if (GNUNET_YES == pth->is_query) 1506 if (GNUNET_YES == pth->is_query)
1452 GNUNET_assert (0 < cp->ppd.pending_queries--); 1507 GNUNET_assert (0 < cp->ppd.pending_queries--);
1453 else if (GNUNET_NO == pth->is_query) 1508 else if (GNUNET_NO == pth->is_query)
@@ -1614,13 +1669,22 @@ GSF_peer_disconnect_handler_ (void *cls,
1614 GNUNET_SCHEDULER_cancel (pth->timeout_task); 1669 GNUNET_SCHEDULER_cancel (pth->timeout_task);
1615 pth->timeout_task = NULL; 1670 pth->timeout_task = NULL;
1616 } 1671 }
1617 GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth); 1672 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1673 cp->pth_tail,
1674 pth);
1675 if (GNUNET_YES == pth->is_query)
1676 GNUNET_assert (0 < cp->ppd.pending_queries--);
1677 else if (GNUNET_NO == pth->is_query)
1678 GNUNET_assert (0 < cp->ppd.pending_replies--);
1618 pth->gmc (pth->gmc_cls, 0, NULL); 1679 pth->gmc (pth->gmc_cls, 0, NULL);
1619 GNUNET_free (pth); 1680 GNUNET_free (pth);
1620 } 1681 }
1621 while (NULL != (dh = cp->delayed_head)) 1682 while (NULL != (dh = cp->delayed_head))
1622 { 1683 {
1623 GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh); 1684 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1685 cp->delayed_tail,
1686 dh);
1687 cp->delay_queue_size--;
1624 GNUNET_SCHEDULER_cancel (dh->delay_task); 1688 GNUNET_SCHEDULER_cancel (dh->delay_task);
1625 GNUNET_free (dh->pm); 1689 GNUNET_free (dh->pm);
1626 GNUNET_free (dh); 1690 GNUNET_free (dh);
@@ -1631,6 +1695,8 @@ GSF_peer_disconnect_handler_ (void *cls,
1631 GNUNET_SCHEDULER_cancel (cp->mig_revive_task); 1695 GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1632 cp->mig_revive_task = NULL; 1696 cp->mig_revive_task = NULL;
1633 } 1697 }
1698 GNUNET_break (0 == cp->ppd.pending_queries);
1699 GNUNET_break (0 == cp->ppd.pending_replies);
1634 GNUNET_free (cp); 1700 GNUNET_free (cp);
1635} 1701}
1636 1702