summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-03-01 01:12:22 +0000
committerChristian Grothoff <christian@grothoff.org>2015-03-01 01:12:22 +0000
commitedc0456b8659fdf6c8724aa5da339442b9e9d275 (patch)
treea66c2d51d17e0ac71d72f11a9b5fe544af1cd562 /src
parent3a2737d76679c68331fad0be0b89d8efdcde5079 (diff)
count number of pending replies and refuse to process queries if queue gets too big
Diffstat (limited to 'src')
-rw-r--r--src/fs/gnunet-service-fs_cp.c144
1 files changed, 105 insertions, 39 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 749ef15c7..eff8286fc 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -86,7 +86,7 @@ struct GSF_PeerTransmitHandle
/**
* Task called on timeout, or 0 for none.
*/
- struct GNUNET_SCHEDULER_Task * timeout_task;
+ struct GNUNET_SCHEDULER_Task *timeout_task;
/**
* Function to call to get the actual message.
@@ -155,7 +155,7 @@ struct GSF_DelayedHandle
/**
* Task for the delay.
*/
- struct GNUNET_SCHEDULER_Task * delay_task;
+ struct GNUNET_SCHEDULER_Task *delay_task;
/**
* Size of the message.
@@ -184,7 +184,7 @@ struct PeerRequest
/**
* Task for asynchronous stopping of this request.
*/
- struct GNUNET_SCHEDULER_Task * kill_task;
+ struct GNUNET_SCHEDULER_Task *kill_task;
};
@@ -209,7 +209,7 @@ struct GSF_ConnectedPeer
/**
* Task scheduled to revive migration to this peer.
*/
- struct GNUNET_SCHEDULER_Task * mig_revive_task;
+ struct GNUNET_SCHEDULER_Task *mig_revive_task;
/**
* Messages (replies, queries, content migration) we would like to
@@ -248,7 +248,7 @@ struct GSF_ConnectedPeer
/**
* Task scheduled if we need to retry bandwidth reservation later.
*/
- struct GNUNET_SCHEDULER_Task * rc_delay_task;
+ struct GNUNET_SCHEDULER_Task *rc_delay_task;
/**
* Active requests from this neighbour, map of query to 'struct PeerRequest'.
@@ -276,6 +276,11 @@ struct GSF_ConnectedPeer
unsigned int cth_in_progress;
/**
+ * Number of entries in @e delayed_head DLL.
+ */
+ unsigned int delay_queue_size;
+
+ /**
* Respect rating for this peer on disk.
*/
uint32_t disk_respect;
@@ -298,8 +303,8 @@ struct GSF_ConnectedPeer
unsigned int last_request_times_off;
/**
- * GNUNET_YES if we did successfully reserve 32k bandwidth,
- * GNUNET_NO if not.
+ * #GNUNET_YES if we did successfully reserve 32k bandwidth,
+ * #GNUNET_NO if not.
*/
int did_reserve;
@@ -439,10 +444,13 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
GNUNET_assert (NULL == cp->cth);
cp->cth_in_progress++;
cp->cth =
- GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
+ GNUNET_CORE_notify_transmit_ready (GSF_core,
+ GNUNET_YES,
GNUNET_CORE_PRIO_BACKGROUND,
GNUNET_TIME_absolute_get_remaining
- (pth->timeout), &target, pth->size,
+ (pth->timeout),
+ &target,
+ pth->size,
&peer_transmit_ready_cb, cp);
GNUNET_assert (NULL != cp->cth);
GNUNET_assert (0 < cp->cth_in_progress--);
@@ -458,7 +466,9 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
* @return number of bytes copied to @a buf
*/
static size_t
-peer_transmit_ready_cb (void *cls, size_t size, void *buf)
+peer_transmit_ready_cb (void *cls,
+ size_t size,
+ void *buf)
{
struct GSF_ConnectedPeer *cp = cls;
struct GSF_PeerTransmitHandle *pth = cp->pth_head;
@@ -478,7 +488,9 @@ peer_transmit_ready_cb (void *cls, size_t size, void *buf)
GNUNET_SCHEDULER_cancel (pth->timeout_task);
pth->timeout_task = NULL;
}
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
{
cp->ppd.last_request_times[(cp->last_request_times_off++) %
@@ -511,7 +523,8 @@ peer_transmit_ready_cb (void *cls, size_t size, void *buf)
* @param tc scheduler context
*/
static void
-retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+retry_reservation (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GSF_ConnectedPeer *cp = cls;
struct GNUNET_PeerIdentity target;
@@ -519,7 +532,9 @@ retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_PEER_resolve (cp->ppd.pid, &target);
cp->rc_delay_task = NULL;
cp->rc =
- GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
+ GNUNET_ATS_reserve_bandwidth (GSF_ats,
+ &target,
+ DBLOCK_SIZE,
&ats_reserve_callback, cp);
}
@@ -736,7 +751,9 @@ GSF_handle_p2p_migration_stop_ (void *cls,
* @return number of bytes copied to @a buf, can be 0 (without indicating an error)
*/
static size_t
-copy_reply (void *cls, size_t buf_size, void *buf)
+copy_reply (void *cls,
+ size_t buf_size,
+ void *buf)
{
struct PutMessage *pm = cls;
size_t size;
@@ -845,15 +862,23 @@ transmit_delayed_now (void *cls,
struct GSF_DelayedHandle *dh = cls;
struct GSF_ConnectedPeer *cp = dh->cp;
- GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size--;
if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
{
GNUNET_free (dh->pm);
GNUNET_free (dh);
return;
}
- (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
- dh->msize, &copy_reply, dh->pm);
+ (void) GSF_peer_transmit_ (cp,
+ GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ dh->msize,
+ &copy_reply,
+ dh->pm);
GNUNET_free (dh);
}
@@ -967,8 +992,9 @@ handle_p2p_reply (void *cls,
pm->type = htonl (type);
pm->expiration = GNUNET_TIME_absolute_hton (expiration);
memcpy (&pm[1], data, data_len);
- if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
- (GNUNET_YES == GSF_enable_randomized_delays))
+ if ( (UINT32_MAX != reply_anonymity_level) &&
+ (0 != reply_anonymity_level) &&
+ (GNUNET_YES == GSF_enable_randomized_delays) )
{
struct GSF_DelayedHandle *dh;
@@ -976,15 +1002,24 @@ handle_p2p_reply (void *cls,
dh->cp = cp;
dh->pm = pm;
dh->msize = msize;
- GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size++;
dh->delay_task =
GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
- &transmit_delayed_now, dh);
+ &transmit_delayed_now,
+ dh);
}
else
{
- (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
- &copy_reply, pm);
+ (void) GSF_peer_transmit_ (cp,
+ GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ msize,
+ &copy_reply,
+ pm);
}
if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
return;
@@ -1164,7 +1199,6 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
enum GNUNET_BLOCK_Type type;
GNUNET_PEER_Id spid;
- GNUNET_assert (other != NULL);
msize = ntohs (message->size);
if (msize < sizeof (struct GetMessage))
{
@@ -1173,7 +1207,8 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
}
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
- ("# GET requests received (from other peers)"), 1,
+ ("# GET requests received (from other peers)"),
+ 1,
GNUNET_NO);
gm = (const struct GetMessage *) message;
type = ntohl (gm->type);
@@ -1219,25 +1254,36 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
{
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
+ "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
GNUNET_i2s (&opt[bits - 1]));
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to find peer `%4s' in connection set. Dropping query.\n",
+ "Failed to find peer `%s' in connection set. Dropping query.\n",
GNUNET_i2s (other));
-#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due to missing reverse route"),
- 1, GNUNET_NO);
-#endif
+ 1,
+ GNUNET_NO);
+ return NULL;
+ }
+ if (cp->ppd.pending_replies + cp->delay_queue_size > 128)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer `%s' has too many replies queued already. Dropping query.\n",
+ GNUNET_i2s (other));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# requests dropped due to full reply queue"),
+ 1,
+ GNUNET_NO);
return NULL;
}
/* note that we can really only check load here since otherwise
* peers could find out that we are overloaded by not being
* disconnected after sending us a malformed query... */
- priority = bound_priority (ntohl (gm->priority), cps);
+ priority = bound_priority (ntohl (gm->priority),
+ cps);
if (priority < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1246,7 +1292,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
return NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
+ "Received request for `%s' of type %u from peer `%s' with flags %u\n",
GNUNET_h2s (&gm->query),
(unsigned int) type,
GNUNET_i2s (other),
@@ -1359,7 +1405,9 @@ peer_transmit_timeout (void *cls,
"Timeout trying to transmit to other peer\n");
pth->timeout_task = NULL;
cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
else if (GNUNET_NO == pth->is_query)
@@ -1419,13 +1467,18 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
prev = pos;
pos = pos->next;
}
- GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
+ GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
+ cp->pth_tail,
+ prev,
+ pth);
if (GNUNET_YES == is_query)
cp->ppd.pending_queries++;
else if (GNUNET_NO == is_query)
cp->ppd.pending_replies++;
- pth->timeout_task =
- GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
+ pth->timeout_task
+ = GNUNET_SCHEDULER_add_delayed (timeout,
+ &peer_transmit_timeout,
+ pth);
schedule_transmission (pth);
return pth;
}
@@ -1447,7 +1500,9 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
pth->timeout_task = NULL;
}
cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
else if (GNUNET_NO == pth->is_query)
@@ -1614,13 +1669,22 @@ GSF_peer_disconnect_handler_ (void *cls,
GNUNET_SCHEDULER_cancel (pth->timeout_task);
pth->timeout_task = NULL;
}
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ if (GNUNET_YES == pth->is_query)
+ GNUNET_assert (0 < cp->ppd.pending_queries--);
+ else if (GNUNET_NO == pth->is_query)
+ GNUNET_assert (0 < cp->ppd.pending_replies--);
pth->gmc (pth->gmc_cls, 0, NULL);
GNUNET_free (pth);
}
while (NULL != (dh = cp->delayed_head))
{
- GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size--;
GNUNET_SCHEDULER_cancel (dh->delay_task);
GNUNET_free (dh->pm);
GNUNET_free (dh);
@@ -1631,6 +1695,8 @@ GSF_peer_disconnect_handler_ (void *cls,
GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
cp->mig_revive_task = NULL;
}
+ GNUNET_break (0 == cp->ppd.pending_queries);
+ GNUNET_break (0 == cp->ppd.pending_replies);
GNUNET_free (cp);
}