aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2018-04-30 10:55:26 +0200
committerChristian Grothoff <christian@grothoff.org>2018-04-30 10:55:26 +0200
commit7fef1456bd44bacaf5aa927c89282a31e89bdcf7 (patch)
tree4718a868fe624a2d7a78f580f33f5f4bf0d6ab1a
parentbdf8e76fc9b72f2000e33f479a15919811a5f312 (diff)
downloadgnunet-7fef1456bd44bacaf5aa927c89282a31e89bdcf7.tar.gz
gnunet-7fef1456bd44bacaf5aa927c89282a31e89bdcf7.zip
enable more parallelism with DHT queue, but limit to 1000 entries, then kill hard
-rw-r--r--src/dht/dht_api.c4
-rw-r--r--src/util/mq.c4
-rw-r--r--src/zonemaster/gnunet-service-zonemaster.c56
3 files changed, 53 insertions, 11 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 42ddc7b60..7a0771de0 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -1028,8 +1028,8 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
1028 put_msg->expiration = GNUNET_TIME_absolute_hton (exp); 1028 put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1029 put_msg->key = *key; 1029 put_msg->key = *key;
1030 GNUNET_memcpy (&put_msg[1], 1030 GNUNET_memcpy (&put_msg[1],
1031 data, 1031 data,
1032 size); 1032 size);
1033 GNUNET_MQ_send (handle->mq, 1033 GNUNET_MQ_send (handle->mq,
1034 env); 1034 env);
1035 return ph; 1035 return ph;
diff --git a/src/util/mq.c b/src/util/mq.c
index af700836c..0f9ad9a12 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -578,11 +578,9 @@ void
578GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, 578GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq,
579 void *handlers_cls) 579 void *handlers_cls)
580{ 580{
581 unsigned int i;
582
583 if (NULL == mq->handlers) 581 if (NULL == mq->handlers)
584 return; 582 return;
585 for (i=0;NULL != mq->handlers[i].cb; i++) 583 for (unsigned int i=0;NULL != mq->handlers[i].cb; i++)
586 mq->handlers[i].cls = handlers_cls; 584 mq->handlers[i].cls = handlers_cls;
587} 585}
588 586
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c
index 5c3356784..b45ed576c 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -50,6 +50,11 @@
50#define NS_BLOCK_SIZE 100 50#define NS_BLOCK_SIZE 100
51 51
52/** 52/**
53 * How many pending DHT operations do we allow at most?
54 */
55#define DHT_QUEUE_LIMIT 1000
56
57/**
53 * The initial interval in milliseconds btween puts in 58 * The initial interval in milliseconds btween puts in
54 * a zone iteration 59 * a zone iteration
55 */ 60 */
@@ -107,6 +112,11 @@ struct DhtPutActivity
107 * Handle for the DHT PUT operation. 112 * Handle for the DHT PUT operation.
108 */ 113 */
109 struct GNUNET_DHT_PutHandle *ph; 114 struct GNUNET_DHT_PutHandle *ph;
115
116 /**
117 * When was this PUT initiated?
118 */
119 struct GNUNET_TIME_Absolute start_date;
110}; 120};
111 121
112 122
@@ -161,6 +171,11 @@ static struct DhtPutActivity *it_head;
161static struct DhtPutActivity *it_tail; 171static struct DhtPutActivity *it_tail;
162 172
163/** 173/**
174 * Number of entries in the DHT queue.
175 */
176static unsigned int dht_queue_length;
177
178/**
164 * Useful for zone update for DHT put 179 * Useful for zone update for DHT put
165 */ 180 */
166static unsigned long long num_public_records; 181static unsigned long long num_public_records;
@@ -265,6 +280,15 @@ shutdown_task (void *cls)
265 ma); 280 ma);
266 GNUNET_free (ma); 281 GNUNET_free (ma);
267 } 282 }
283 while (NULL != (ma = it_head))
284 {
285 GNUNET_DHT_put_cancel (ma->ph);
286 GNUNET_CONTAINER_DLL_remove (it_head,
287 it_tail,
288 ma);
289 dht_queue_length--;
290 GNUNET_free (ma);
291 }
268 if (NULL != statistics) 292 if (NULL != statistics)
269 { 293 {
270 GNUNET_STATISTICS_destroy (statistics, 294 GNUNET_STATISTICS_destroy (statistics,
@@ -362,8 +386,6 @@ check_zone_dht_next ()
362 386
363 if (0 != ns_iteration_left) 387 if (0 != ns_iteration_left)
364 return; /* current NAMESTORE iteration not yet done */ 388 return; /* current NAMESTORE iteration not yet done */
365 if (NULL != it_head)
366 return; /* waiting on DHT */
367 delay = GNUNET_TIME_relative_subtract (next_put_interval, 389 delay = GNUNET_TIME_relative_subtract (next_put_interval,
368 sub_delta); 390 sub_delta);
369 /* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the 391 /* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the
@@ -472,6 +494,10 @@ update_velocity ()
472 } 494 }
473 } 495 }
474 GNUNET_STATISTICS_set (statistics, 496 GNUNET_STATISTICS_set (statistics,
497 "# size of the DHT queue",
498 dht_queue_length,
499 GNUNET_NO);
500 GNUNET_STATISTICS_set (statistics,
475 "% speed increase needed for target velocity", 501 "% speed increase needed for target velocity",
476 pct, 502 pct,
477 GNUNET_NO); 503 GNUNET_NO);
@@ -498,6 +524,7 @@ dht_put_continuation (void *cls,
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
499 "PUT complete (%s)\n", 525 "PUT complete (%s)\n",
500 (GNUNET_OK == success) ? "success" : "failure"); 526 (GNUNET_OK == success) ? "success" : "failure");
527 dht_queue_length--;
501 GNUNET_CONTAINER_DLL_remove (it_head, 528 GNUNET_CONTAINER_DLL_remove (it_head,
502 it_tail, 529 it_tail,
503 ma); 530 ma);
@@ -508,7 +535,6 @@ dht_put_continuation (void *cls,
508 if (0 == put_cnt % DELTA_INTERVAL) 535 if (0 == put_cnt % DELTA_INTERVAL)
509 update_velocity (); 536 update_velocity ();
510 } 537 }
511 check_zone_dht_next ();
512} 538}
513 539
514 540
@@ -766,6 +792,7 @@ put_gns_record (void *cls,
766 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 792 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767 "Starting DHT PUT\n"); 793 "Starting DHT PUT\n");
768 ma = GNUNET_new (struct DhtPutActivity); 794 ma = GNUNET_new (struct DhtPutActivity);
795 ma->start_date = GNUNET_TIME_absolute_get ();
769 ma->ph = perform_dht_put (key, 796 ma->ph = perform_dht_put (key,
770 label, 797 label,
771 rd_public, 798 rd_public,
@@ -780,9 +807,24 @@ put_gns_record (void *cls,
780 check_zone_dht_next (); 807 check_zone_dht_next ();
781 return; 808 return;
782 } 809 }
783 GNUNET_CONTAINER_DLL_insert (it_head, 810 dht_queue_length++;
784 it_tail, 811 GNUNET_CONTAINER_DLL_insert_tail (it_head,
785 ma); 812 it_tail,
813 ma);
814 if (dht_queue_length > DHT_QUEUE_LIMIT)
815 {
816 ma = it_head;
817 GNUNET_CONTAINER_DLL_remove (it_head,
818 it_tail,
819 ma);
820 GNUNET_DHT_put_cancel (ma->ph);
821 dht_queue_length--;
822 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823 "DHT PUT unconfirmed after %s, aborting PUT\n",
824 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (ma->start_date),
825 GNUNET_YES));
826 GNUNET_free (ma);
827 }
786} 828}
787 829
788 830
@@ -815,6 +857,7 @@ publish_zone_dht_start (void *cls)
815 NULL, 857 NULL,
816 &zone_iteration_finished, 858 &zone_iteration_finished,
817 NULL); 859 NULL);
860 GNUNET_assert (NULL != namestore_iter);
818} 861}
819 862
820 863
@@ -855,6 +898,7 @@ handle_monitor_event (void *cls,
855 if (0 == rd_public_count) 898 if (0 == rd_public_count)
856 return; /* nothing to do */ 899 return; /* nothing to do */
857 ma = GNUNET_new (struct DhtPutActivity); 900 ma = GNUNET_new (struct DhtPutActivity);
901 ma->start_date = GNUNET_TIME_absolute_get ();
858 ma->ph = perform_dht_put (zone, 902 ma->ph = perform_dht_put (zone,
859 label, 903 label,
860 rd, 904 rd,