summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2018-04-30 10:55:26 +0200
committerChristian Grothoff <christian@grothoff.org>2018-04-30 10:55:26 +0200
commit7fef1456bd44bacaf5aa927c89282a31e89bdcf7 (patch)
tree4718a868fe624a2d7a78f580f33f5f4bf0d6ab1a /src
parentbdf8e76fc9b72f2000e33f479a15919811a5f312 (diff)
enable more parallelism with DHT queue, but limit to 1000 entries, then kill hard
Diffstat (limited to 'src')
-rw-r--r--src/dht/dht_api.c4
-rw-r--r--src/util/mq.c4
-rw-r--r--src/zonemaster/gnunet-service-zonemaster.c56
3 files changed, 53 insertions, 11 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 42ddc7b60..7a0771de0 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -1028,8 +1028,8 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
put_msg->key = *key;
GNUNET_memcpy (&put_msg[1],
- data,
- size);
+ data,
+ size);
GNUNET_MQ_send (handle->mq,
env);
return ph;
diff --git a/src/util/mq.c b/src/util/mq.c
index af700836c..0f9ad9a12 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -578,11 +578,9 @@ void
GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
void *handlers_cls)
{
- unsigned int i;
-
if (NULL == mq->handlers)
return;
- for (i=0;NULL != mq->handlers[i].cb; i++)
+ for (unsigned int i=0;NULL != mq->handlers[i].cb; i++)
mq->handlers[i].cls = handlers_cls;
}
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c
index 5c3356784..b45ed576c 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -50,6 +50,11 @@
#define NS_BLOCK_SIZE 100
/**
+ * How many pending DHT operations do we allow at most?
+ */
+#define DHT_QUEUE_LIMIT 1000
+
+/**
* The initial interval in milliseconds btween puts in
* a zone iteration
*/
@@ -107,6 +112,11 @@ struct DhtPutActivity
* Handle for the DHT PUT operation.
*/
struct GNUNET_DHT_PutHandle *ph;
+
+ /**
+ * When was this PUT initiated?
+ */
+ struct GNUNET_TIME_Absolute start_date;
};
@@ -161,6 +171,11 @@ static struct DhtPutActivity *it_head;
static struct DhtPutActivity *it_tail;
/**
+ * Number of entries in the DHT queue.
+ */
+static unsigned int dht_queue_length;
+
+/**
* Useful for zone update for DHT put
*/
static unsigned long long num_public_records;
@@ -265,6 +280,15 @@ shutdown_task (void *cls)
ma);
GNUNET_free (ma);
}
+ while (NULL != (ma = it_head))
+ {
+ GNUNET_DHT_put_cancel (ma->ph);
+ GNUNET_CONTAINER_DLL_remove (it_head,
+ it_tail,
+ ma);
+ dht_queue_length--;
+ GNUNET_free (ma);
+ }
if (NULL != statistics)
{
GNUNET_STATISTICS_destroy (statistics,
@@ -362,8 +386,6 @@ check_zone_dht_next ()
if (0 != ns_iteration_left)
return; /* current NAMESTORE iteration not yet done */
- if (NULL != it_head)
- return; /* waiting on DHT */
delay = GNUNET_TIME_relative_subtract (next_put_interval,
sub_delta);
/* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the
@@ -472,6 +494,10 @@ update_velocity ()
}
}
GNUNET_STATISTICS_set (statistics,
+ "# size of the DHT queue",
+ dht_queue_length,
+ GNUNET_NO);
+ GNUNET_STATISTICS_set (statistics,
"% speed increase needed for target velocity",
pct,
GNUNET_NO);
@@ -498,6 +524,7 @@ dht_put_continuation (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"PUT complete (%s)\n",
(GNUNET_OK == success) ? "success" : "failure");
+ dht_queue_length--;
GNUNET_CONTAINER_DLL_remove (it_head,
it_tail,
ma);
@@ -508,7 +535,6 @@ dht_put_continuation (void *cls,
if (0 == put_cnt % DELTA_INTERVAL)
update_velocity ();
}
- check_zone_dht_next ();
}
@@ -766,6 +792,7 @@ put_gns_record (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting DHT PUT\n");
ma = GNUNET_new (struct DhtPutActivity);
+ ma->start_date = GNUNET_TIME_absolute_get ();
ma->ph = perform_dht_put (key,
label,
rd_public,
@@ -780,9 +807,24 @@ put_gns_record (void *cls,
check_zone_dht_next ();
return;
}
- GNUNET_CONTAINER_DLL_insert (it_head,
- it_tail,
- ma);
+ dht_queue_length++;
+ GNUNET_CONTAINER_DLL_insert_tail (it_head,
+ it_tail,
+ ma);
+ if (dht_queue_length > DHT_QUEUE_LIMIT)
+ {
+ ma = it_head;
+ GNUNET_CONTAINER_DLL_remove (it_head,
+ it_tail,
+ ma);
+ GNUNET_DHT_put_cancel (ma->ph);
+ dht_queue_length--;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "DHT PUT unconfirmed after %s, aborting PUT\n",
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (ma->start_date),
+ GNUNET_YES));
+ GNUNET_free (ma);
+ }
}
@@ -815,6 +857,7 @@ publish_zone_dht_start (void *cls)
NULL,
&zone_iteration_finished,
NULL);
+ GNUNET_assert (NULL != namestore_iter);
}
@@ -855,6 +898,7 @@ handle_monitor_event (void *cls,
if (0 == rd_public_count)
return; /* nothing to do */
ma = GNUNET_new (struct DhtPutActivity);
+ ma->start_date = GNUNET_TIME_absolute_get ();
ma->ph = perform_dht_put (zone,
label,
rd,