diff options
author | Martin Schanzenbach <schanzen@gnunet.org> | 2022-10-26 12:13:56 +0900 |
---|---|---|
committer | Martin Schanzenbach <schanzen@gnunet.org> | 2022-10-26 12:13:56 +0900 |
commit | 85b690809e76aaddb83fd9be96a69426706ec617 (patch) | |
tree | c804d4547b739cfc58c62339421544e28d11a610 /src/zonemaster/gnunet-service-zonemaster.c | |
parent | 5d116312a08aa5b76fc0075fb9a244487dd1fb68 (diff) | |
download | gnunet-85b690809e76aaddb83fd9be96a69426706ec617.tar.gz gnunet-85b690809e76aaddb83fd9be96a69426706ec617.zip |
-improve job queue handling
Diffstat (limited to 'src/zonemaster/gnunet-service-zonemaster.c')
-rw-r--r-- | src/zonemaster/gnunet-service-zonemaster.c | 146 |
1 files changed, 79 insertions, 67 deletions
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c index 09053a676..863716a44 100644 --- a/src/zonemaster/gnunet-service-zonemaster.c +++ b/src/zonemaster/gnunet-service-zonemaster.c | |||
@@ -57,15 +57,9 @@ | |||
57 | #define NS_BLOCK_SIZE 1000 | 57 | #define NS_BLOCK_SIZE 1000 |
58 | 58 | ||
59 | /** | 59 | /** |
60 | * How many pending DHT operations do we allow at most? | 60 | * How many open jobs (and with it maximum amount of pending DHT operations) do we allow at most |
61 | */ | 61 | */ |
62 | #define DHT_QUEUE_LIMIT 5000 | 62 | #define JOB_QUEUE_LIMIT 5000 |
63 | |||
64 | /** | ||
65 | * How many events may the namestore give us before it has to wait | ||
66 | * for us to keep up? | ||
67 | */ | ||
68 | #define NAMESTORE_QUEUE_LIMIT 50 | ||
69 | 63 | ||
70 | /** | 64 | /** |
71 | * How many events may the namestore give us before it has to wait | 65 | * How many events may the namestore give us before it has to wait |
@@ -123,6 +117,16 @@ static pthread_cond_t empty_jobs; | |||
123 | static int in_shutdown = GNUNET_NO; | 117 | static int in_shutdown = GNUNET_NO; |
124 | 118 | ||
125 | /** | 119 | /** |
120 | * Iterator halted? | ||
121 | */ | ||
122 | static int iterator_halted = GNUNET_NO; | ||
123 | |||
124 | /** | ||
125 | * Monitor halted? | ||
126 | */ | ||
127 | static int monitor_halted = GNUNET_NO; | ||
128 | |||
129 | /** | ||
126 | * Our notification pipe | 130 | * Our notification pipe |
127 | */ | 131 | */ |
128 | static struct GNUNET_DISK_PipeHandle *notification_pipe; | 132 | static struct GNUNET_DISK_PipeHandle *notification_pipe; |
@@ -274,9 +278,9 @@ static struct DhtPutActivity *it_head; | |||
274 | static struct DhtPutActivity *it_tail; | 278 | static struct DhtPutActivity *it_tail; |
275 | 279 | ||
276 | /** | 280 | /** |
277 | * Number of entries in the DHT queue #it_head. | 281 | * Number of entries in the job queue #jobs_head. |
278 | */ | 282 | */ |
279 | static unsigned int dht_queue_length; | 283 | static unsigned int job_queue_length; |
280 | 284 | ||
281 | /** | 285 | /** |
282 | * Useful for zone update for DHT put | 286 | * Useful for zone update for DHT put |
@@ -419,6 +423,7 @@ shutdown_task (void *cls) | |||
419 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 423 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
420 | "Removing incomplete jobs\n"); | 424 | "Removing incomplete jobs\n"); |
421 | GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); | 425 | GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); |
426 | job_queue_length--; | ||
422 | free_job (job); | 427 | free_job (job); |
423 | } | 428 | } |
424 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | 429 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); |
@@ -439,7 +444,6 @@ shutdown_task (void *cls) | |||
439 | GNUNET_CONTAINER_DLL_remove (it_head, | 444 | GNUNET_CONTAINER_DLL_remove (it_head, |
440 | it_tail, | 445 | it_tail, |
441 | ma); | 446 | ma); |
442 | dht_queue_length--; | ||
443 | GNUNET_free (ma); | 447 | GNUNET_free (ma); |
444 | } | 448 | } |
445 | if (NULL != statistics) | 449 | if (NULL != statistics) |
@@ -706,8 +710,8 @@ update_velocity (unsigned int cnt) | |||
706 | } | 710 | } |
707 | } | 711 | } |
708 | GNUNET_STATISTICS_set (statistics, | 712 | GNUNET_STATISTICS_set (statistics, |
709 | "# size of the DHT queue (it)", | 713 | "# dispatched jobs", |
710 | dht_queue_length, | 714 | job_queue_length, |
711 | GNUNET_NO); | 715 | GNUNET_NO); |
712 | GNUNET_STATISTICS_set (statistics, | 716 | GNUNET_STATISTICS_set (statistics, |
713 | "% speed increase needed for target velocity", | 717 | "% speed increase needed for target velocity", |
@@ -769,14 +773,21 @@ dht_put_continuation (void *cls) | |||
769 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 773 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
770 | "PUT complete\n"); | 774 | "PUT complete\n"); |
771 | /* When we just fall under the limit, trigger monitor/iterator again | 775 | /* When we just fall under the limit, trigger monitor/iterator again |
772 | * creating a race condition, but we may actually have finished more | 776 | * if halted. We can only safely trigger one, prefer iterator. */ |
773 | * PUTs by the time they come back and both can carry on */ | 777 | if (job_queue_length <= JOB_QUEUE_LIMIT) |
774 | if (dht_queue_length == DHT_QUEUE_LIMIT) | ||
775 | { | 778 | { |
776 | GNUNET_NAMESTORE_zone_monitor_next (zmon, 1); | 779 | if (GNUNET_YES == iterator_halted) |
777 | GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, 1); | 780 | { |
781 | GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, 1); | ||
782 | iterator_halted = GNUNET_NO; | ||
783 | } | ||
784 | else if (GNUNET_YES == monitor_halted) | ||
785 | { | ||
786 | GNUNET_NAMESTORE_zone_monitor_next (zmon, 1); | ||
787 | monitor_halted = GNUNET_NO; | ||
788 | } | ||
778 | } | 789 | } |
779 | dht_queue_length--; | 790 | job_queue_length--; |
780 | GNUNET_CONTAINER_DLL_remove (it_head, | 791 | GNUNET_CONTAINER_DLL_remove (it_head, |
781 | it_tail, | 792 | it_tail, |
782 | ma); | 793 | ma); |
@@ -795,12 +806,12 @@ dht_put_continuation (void *cls) | |||
795 | * @return DHT PUT handle, NULL on error | 806 | * @return DHT PUT handle, NULL on error |
796 | */ | 807 | */ |
797 | static void | 808 | static void |
798 | perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key, | 809 | dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key, |
799 | const char *label, | 810 | const char *label, |
800 | const struct GNUNET_GNSRECORD_Data *rd, | 811 | const struct GNUNET_GNSRECORD_Data *rd, |
801 | unsigned int rd_count, | 812 | unsigned int rd_count, |
802 | const struct GNUNET_TIME_Absolute expire, | 813 | const struct GNUNET_TIME_Absolute expire, |
803 | struct DhtPutActivity *ma) | 814 | struct DhtPutActivity *ma) |
804 | { | 815 | { |
805 | struct GNUNET_GNSRECORD_Data rd_public[rd_count]; | 816 | struct GNUNET_GNSRECORD_Data rd_public[rd_count]; |
806 | struct GNUNET_GNSRECORD_Block *block; | 817 | struct GNUNET_GNSRECORD_Block *block; |
@@ -1024,12 +1035,12 @@ zone_iteration_finished (void *cls) | |||
1024 | * @param rd the record data | 1035 | * @param rd the record data |
1025 | */ | 1036 | */ |
1026 | static void | 1037 | static void |
1027 | put_gns_record (void *cls, | 1038 | handle_record (void *cls, |
1028 | const struct GNUNET_IDENTITY_PrivateKey *key, | 1039 | const struct GNUNET_IDENTITY_PrivateKey *key, |
1029 | const char *label, | 1040 | const char *label, |
1030 | unsigned int rd_count, | 1041 | unsigned int rd_count, |
1031 | const struct GNUNET_GNSRECORD_Data *rd, | 1042 | const struct GNUNET_GNSRECORD_Data *rd, |
1032 | struct GNUNET_TIME_Absolute expire) | 1043 | struct GNUNET_TIME_Absolute expire) |
1033 | { | 1044 | { |
1034 | struct DhtPutActivity *ma; | 1045 | struct DhtPutActivity *ma; |
1035 | 1046 | ||
@@ -1061,26 +1072,26 @@ put_gns_record (void *cls, | |||
1061 | put_cnt++; | 1072 | put_cnt++; |
1062 | if (0 == put_cnt % DELTA_INTERVAL) | 1073 | if (0 == put_cnt % DELTA_INTERVAL) |
1063 | update_velocity (DELTA_INTERVAL); | 1074 | update_velocity (DELTA_INTERVAL); |
1064 | if (dht_queue_length >= DHT_QUEUE_LIMIT) | 1075 | ma = GNUNET_new (struct DhtPutActivity); |
1076 | dispatch_job (key, | ||
1077 | label, | ||
1078 | rd, | ||
1079 | rd_count, | ||
1080 | expire, | ||
1081 | ma); | ||
1082 | GNUNET_CONTAINER_DLL_insert_tail (it_head, | ||
1083 | it_tail, | ||
1084 | ma); | ||
1085 | job_queue_length++; | ||
1086 | if (job_queue_length >= JOB_QUEUE_LIMIT) | ||
1065 | { | 1087 | { |
1066 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1088 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1067 | "DHT PUT queue length exceeded (%u), aborting PUT\n", | 1089 | "Job queue length exceeded (%u). Halting namestore iteration.\n", |
1068 | DHT_QUEUE_LIMIT); | 1090 | JOB_QUEUE_LIMIT); |
1091 | iterator_halted = GNUNET_YES; | ||
1069 | return; | 1092 | return; |
1070 | } | 1093 | } |
1071 | check_zone_namestore_next (); | 1094 | check_zone_namestore_next (); |
1072 | |||
1073 | ma = GNUNET_new (struct DhtPutActivity); | ||
1074 | perform_dht_put (key, | ||
1075 | label, | ||
1076 | rd, | ||
1077 | rd_count, | ||
1078 | expire, | ||
1079 | ma); | ||
1080 | dht_queue_length++; | ||
1081 | GNUNET_CONTAINER_DLL_insert_tail (it_head, | ||
1082 | it_tail, | ||
1083 | ma); | ||
1084 | } | 1095 | } |
1085 | 1096 | ||
1086 | 1097 | ||
@@ -1109,7 +1120,7 @@ publish_zone_dht_start (void *cls) | |||
1109 | NULL, /* All zones */ | 1120 | NULL, /* All zones */ |
1110 | &zone_iteration_error, | 1121 | &zone_iteration_error, |
1111 | NULL, | 1122 | NULL, |
1112 | &put_gns_record, | 1123 | &handle_record, |
1113 | NULL, | 1124 | NULL, |
1114 | &zone_iteration_finished, | 1125 | &zone_iteration_finished, |
1115 | NULL, | 1126 | NULL, |
@@ -1129,12 +1140,12 @@ publish_zone_dht_start (void *cls) | |||
1129 | * @return DHT PUT handle, NULL on error | 1140 | * @return DHT PUT handle, NULL on error |
1130 | */ | 1141 | */ |
1131 | static void | 1142 | static void |
1132 | perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, | 1143 | dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, |
1133 | const char *label, | 1144 | const char *label, |
1134 | const struct GNUNET_GNSRECORD_Data *rd, | 1145 | const struct GNUNET_GNSRECORD_Data *rd, |
1135 | unsigned int rd_count, | 1146 | unsigned int rd_count, |
1136 | struct GNUNET_TIME_Absolute expire, | 1147 | struct GNUNET_TIME_Absolute expire, |
1137 | struct DhtPutActivity *ma) | 1148 | struct DhtPutActivity *ma) |
1138 | { | 1149 | { |
1139 | struct GNUNET_GNSRECORD_Data rd_public[rd_count]; | 1150 | struct GNUNET_GNSRECORD_Data rd_public[rd_count]; |
1140 | struct GNUNET_GNSRECORD_Block *block; | 1151 | struct GNUNET_GNSRECORD_Block *block; |
@@ -1232,26 +1243,27 @@ handle_monitor_event (void *cls, | |||
1232 | 1); | 1243 | 1); |
1233 | return; /* nothing to do */ | 1244 | return; /* nothing to do */ |
1234 | } | 1245 | } |
1235 | if (dht_queue_length >= DHT_QUEUE_LIMIT) | 1246 | ma = GNUNET_new (struct DhtPutActivity); |
1247 | dispatch_job_monitor (zone, | ||
1248 | label, | ||
1249 | rd, | ||
1250 | rd_count, | ||
1251 | expire, | ||
1252 | ma); | ||
1253 | GNUNET_CONTAINER_DLL_insert_tail (it_head, | ||
1254 | it_tail, | ||
1255 | ma); | ||
1256 | job_queue_length++; | ||
1257 | if (job_queue_length >= JOB_QUEUE_LIMIT) | ||
1236 | { | 1258 | { |
1237 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1259 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1238 | "DHT PUT queue length exceeded (%u), aborting PUT\n", | 1260 | "Job queue length exceeded (%u). Halting monitor.\n", |
1239 | DHT_QUEUE_LIMIT); | 1261 | JOB_QUEUE_LIMIT); |
1262 | monitor_halted = GNUNET_YES; | ||
1240 | return; | 1263 | return; |
1241 | } | 1264 | } |
1242 | ma = GNUNET_new (struct DhtPutActivity); | ||
1243 | perform_dht_put_monitor (zone, | ||
1244 | label, | ||
1245 | rd, | ||
1246 | rd_count, | ||
1247 | expire, | ||
1248 | ma); | ||
1249 | GNUNET_NAMESTORE_zone_monitor_next (zmon, | 1265 | GNUNET_NAMESTORE_zone_monitor_next (zmon, |
1250 | 1); | 1266 | 1); |
1251 | GNUNET_CONTAINER_DLL_insert_tail (it_head, | ||
1252 | it_tail, | ||
1253 | ma); | ||
1254 | dht_queue_length++; | ||
1255 | } | 1267 | } |
1256 | 1268 | ||
1257 | 1269 | ||