aboutsummaryrefslogtreecommitdiff
path: root/src/zonemaster/gnunet-service-zonemaster.c
diff options
context:
space:
mode:
authorMartin Schanzenbach <schanzen@gnunet.org>2022-10-26 12:13:56 +0900
committerMartin Schanzenbach <schanzen@gnunet.org>2022-10-26 12:13:56 +0900
commit85b690809e76aaddb83fd9be96a69426706ec617 (patch)
treec804d4547b739cfc58c62339421544e28d11a610 /src/zonemaster/gnunet-service-zonemaster.c
parent5d116312a08aa5b76fc0075fb9a244487dd1fb68 (diff)
downloadgnunet-85b690809e76aaddb83fd9be96a69426706ec617.tar.gz
gnunet-85b690809e76aaddb83fd9be96a69426706ec617.zip
-improve job queue handling
Diffstat (limited to 'src/zonemaster/gnunet-service-zonemaster.c')
-rw-r--r--src/zonemaster/gnunet-service-zonemaster.c146
1 files changed, 79 insertions, 67 deletions
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c
index 09053a676..863716a44 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -57,15 +57,9 @@
57#define NS_BLOCK_SIZE 1000 57#define NS_BLOCK_SIZE 1000
58 58
59/** 59/**
60 * How many pending DHT operations do we allow at most? 60 * How many open jobs (and with it maximum amount of pending DHT operations) do we allow at most
61 */ 61 */
62#define DHT_QUEUE_LIMIT 5000 62#define JOB_QUEUE_LIMIT 5000
63
64/**
65 * How many events may the namestore give us before it has to wait
66 * for us to keep up?
67 */
68#define NAMESTORE_QUEUE_LIMIT 50
69 63
70/** 64/**
71 * How many events may the namestore give us before it has to wait 65 * How many events may the namestore give us before it has to wait
@@ -123,6 +117,16 @@ static pthread_cond_t empty_jobs;
123static int in_shutdown = GNUNET_NO; 117static int in_shutdown = GNUNET_NO;
124 118
125/** 119/**
120 * Iterator halted?
121 */
122static int iterator_halted = GNUNET_NO;
123
124/**
125 * Monitor halted?
126 */
127static int monitor_halted = GNUNET_NO;
128
129/**
126 * Our notification pipe 130 * Our notification pipe
127 */ 131 */
128static struct GNUNET_DISK_PipeHandle *notification_pipe; 132static struct GNUNET_DISK_PipeHandle *notification_pipe;
@@ -274,9 +278,9 @@ static struct DhtPutActivity *it_head;
274static struct DhtPutActivity *it_tail; 278static struct DhtPutActivity *it_tail;
275 279
276/** 280/**
277 * Number of entries in the DHT queue #it_head. 281 * Number of entries in the job queue #jobs_head.
278 */ 282 */
279static unsigned int dht_queue_length; 283static unsigned int job_queue_length;
280 284
281/** 285/**
282 * Useful for zone update for DHT put 286 * Useful for zone update for DHT put
@@ -419,6 +423,7 @@ shutdown_task (void *cls)
419 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 423 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
420 "Removing incomplete jobs\n"); 424 "Removing incomplete jobs\n");
421 GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); 425 GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job);
426 job_queue_length--;
422 free_job (job); 427 free_job (job);
423 } 428 }
424 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); 429 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
@@ -439,7 +444,6 @@ shutdown_task (void *cls)
439 GNUNET_CONTAINER_DLL_remove (it_head, 444 GNUNET_CONTAINER_DLL_remove (it_head,
440 it_tail, 445 it_tail,
441 ma); 446 ma);
442 dht_queue_length--;
443 GNUNET_free (ma); 447 GNUNET_free (ma);
444 } 448 }
445 if (NULL != statistics) 449 if (NULL != statistics)
@@ -706,8 +710,8 @@ update_velocity (unsigned int cnt)
706 } 710 }
707 } 711 }
708 GNUNET_STATISTICS_set (statistics, 712 GNUNET_STATISTICS_set (statistics,
709 "# size of the DHT queue (it)", 713 "# dispatched jobs",
710 dht_queue_length, 714 job_queue_length,
711 GNUNET_NO); 715 GNUNET_NO);
712 GNUNET_STATISTICS_set (statistics, 716 GNUNET_STATISTICS_set (statistics,
713 "% speed increase needed for target velocity", 717 "% speed increase needed for target velocity",
@@ -769,14 +773,21 @@ dht_put_continuation (void *cls)
769 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
770 "PUT complete\n"); 774 "PUT complete\n");
771 /* When we just fall under the limit, trigger monitor/iterator again 775 /* When we just fall under the limit, trigger monitor/iterator again
772 * creating a race condition, but we may actually have finished more 776 * if halted. We can only safely trigger one, prefer iterator. */
773 * PUTs by the time they come back and both can carry on */ 777 if (job_queue_length <= JOB_QUEUE_LIMIT)
774 if (dht_queue_length == DHT_QUEUE_LIMIT)
775 { 778 {
776 GNUNET_NAMESTORE_zone_monitor_next (zmon, 1); 779 if (GNUNET_YES == iterator_halted)
777 GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, 1); 780 {
781 GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, 1);
782 iterator_halted = GNUNET_NO;
783 }
784 else if (GNUNET_YES == monitor_halted)
785 {
786 GNUNET_NAMESTORE_zone_monitor_next (zmon, 1);
787 monitor_halted = GNUNET_NO;
788 }
778 } 789 }
779 dht_queue_length--; 790 job_queue_length--;
780 GNUNET_CONTAINER_DLL_remove (it_head, 791 GNUNET_CONTAINER_DLL_remove (it_head,
781 it_tail, 792 it_tail,
782 ma); 793 ma);
@@ -795,12 +806,12 @@ dht_put_continuation (void *cls)
795 * @return DHT PUT handle, NULL on error 806 * @return DHT PUT handle, NULL on error
796 */ 807 */
797static void 808static void
798perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key, 809dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key,
799 const char *label, 810 const char *label,
800 const struct GNUNET_GNSRECORD_Data *rd, 811 const struct GNUNET_GNSRECORD_Data *rd,
801 unsigned int rd_count, 812 unsigned int rd_count,
802 const struct GNUNET_TIME_Absolute expire, 813 const struct GNUNET_TIME_Absolute expire,
803 struct DhtPutActivity *ma) 814 struct DhtPutActivity *ma)
804{ 815{
805 struct GNUNET_GNSRECORD_Data rd_public[rd_count]; 816 struct GNUNET_GNSRECORD_Data rd_public[rd_count];
806 struct GNUNET_GNSRECORD_Block *block; 817 struct GNUNET_GNSRECORD_Block *block;
@@ -1024,12 +1035,12 @@ zone_iteration_finished (void *cls)
1024 * @param rd the record data 1035 * @param rd the record data
1025 */ 1036 */
1026static void 1037static void
1027put_gns_record (void *cls, 1038handle_record (void *cls,
1028 const struct GNUNET_IDENTITY_PrivateKey *key, 1039 const struct GNUNET_IDENTITY_PrivateKey *key,
1029 const char *label, 1040 const char *label,
1030 unsigned int rd_count, 1041 unsigned int rd_count,
1031 const struct GNUNET_GNSRECORD_Data *rd, 1042 const struct GNUNET_GNSRECORD_Data *rd,
1032 struct GNUNET_TIME_Absolute expire) 1043 struct GNUNET_TIME_Absolute expire)
1033{ 1044{
1034 struct DhtPutActivity *ma; 1045 struct DhtPutActivity *ma;
1035 1046
@@ -1061,26 +1072,26 @@ put_gns_record (void *cls,
1061 put_cnt++; 1072 put_cnt++;
1062 if (0 == put_cnt % DELTA_INTERVAL) 1073 if (0 == put_cnt % DELTA_INTERVAL)
1063 update_velocity (DELTA_INTERVAL); 1074 update_velocity (DELTA_INTERVAL);
1064 if (dht_queue_length >= DHT_QUEUE_LIMIT) 1075 ma = GNUNET_new (struct DhtPutActivity);
1076 dispatch_job (key,
1077 label,
1078 rd,
1079 rd_count,
1080 expire,
1081 ma);
1082 GNUNET_CONTAINER_DLL_insert_tail (it_head,
1083 it_tail,
1084 ma);
1085 job_queue_length++;
1086 if (job_queue_length >= JOB_QUEUE_LIMIT)
1065 { 1087 {
1066 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1088 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1067 "DHT PUT queue length exceeded (%u), aborting PUT\n", 1089 "Job queue length exceeded (%u). Halting namestore iteration.\n",
1068 DHT_QUEUE_LIMIT); 1090 JOB_QUEUE_LIMIT);
1091 iterator_halted = GNUNET_YES;
1069 return; 1092 return;
1070 } 1093 }
1071 check_zone_namestore_next (); 1094 check_zone_namestore_next ();
1072
1073 ma = GNUNET_new (struct DhtPutActivity);
1074 perform_dht_put (key,
1075 label,
1076 rd,
1077 rd_count,
1078 expire,
1079 ma);
1080 dht_queue_length++;
1081 GNUNET_CONTAINER_DLL_insert_tail (it_head,
1082 it_tail,
1083 ma);
1084} 1095}
1085 1096
1086 1097
@@ -1109,7 +1120,7 @@ publish_zone_dht_start (void *cls)
1109 NULL, /* All zones */ 1120 NULL, /* All zones */
1110 &zone_iteration_error, 1121 &zone_iteration_error,
1111 NULL, 1122 NULL,
1112 &put_gns_record, 1123 &handle_record,
1113 NULL, 1124 NULL,
1114 &zone_iteration_finished, 1125 &zone_iteration_finished,
1115 NULL, 1126 NULL,
@@ -1129,12 +1140,12 @@ publish_zone_dht_start (void *cls)
1129 * @return DHT PUT handle, NULL on error 1140 * @return DHT PUT handle, NULL on error
1130 */ 1141 */
1131static void 1142static void
1132perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, 1143dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
1133 const char *label, 1144 const char *label,
1134 const struct GNUNET_GNSRECORD_Data *rd, 1145 const struct GNUNET_GNSRECORD_Data *rd,
1135 unsigned int rd_count, 1146 unsigned int rd_count,
1136 struct GNUNET_TIME_Absolute expire, 1147 struct GNUNET_TIME_Absolute expire,
1137 struct DhtPutActivity *ma) 1148 struct DhtPutActivity *ma)
1138{ 1149{
1139 struct GNUNET_GNSRECORD_Data rd_public[rd_count]; 1150 struct GNUNET_GNSRECORD_Data rd_public[rd_count];
1140 struct GNUNET_GNSRECORD_Block *block; 1151 struct GNUNET_GNSRECORD_Block *block;
@@ -1232,26 +1243,27 @@ handle_monitor_event (void *cls,
1232 1); 1243 1);
1233 return; /* nothing to do */ 1244 return; /* nothing to do */
1234 } 1245 }
1235 if (dht_queue_length >= DHT_QUEUE_LIMIT) 1246 ma = GNUNET_new (struct DhtPutActivity);
1247 dispatch_job_monitor (zone,
1248 label,
1249 rd,
1250 rd_count,
1251 expire,
1252 ma);
1253 GNUNET_CONTAINER_DLL_insert_tail (it_head,
1254 it_tail,
1255 ma);
1256 job_queue_length++;
1257 if (job_queue_length >= JOB_QUEUE_LIMIT)
1236 { 1258 {
1237 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1259 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1238 "DHT PUT queue length exceeded (%u), aborting PUT\n", 1260 "Job queue length exceeded (%u). Halting monitor.\n",
1239 DHT_QUEUE_LIMIT); 1261 JOB_QUEUE_LIMIT);
1262 monitor_halted = GNUNET_YES;
1240 return; 1263 return;
1241 } 1264 }
1242 ma = GNUNET_new (struct DhtPutActivity);
1243 perform_dht_put_monitor (zone,
1244 label,
1245 rd,
1246 rd_count,
1247 expire,
1248 ma);
1249 GNUNET_NAMESTORE_zone_monitor_next (zmon, 1265 GNUNET_NAMESTORE_zone_monitor_next (zmon,
1250 1); 1266 1);
1251 GNUNET_CONTAINER_DLL_insert_tail (it_head,
1252 it_tail,
1253 ma);
1254 dht_queue_length++;
1255} 1267}
1256 1268
1257 1269