diff options
author | Martin Schanzenbach <schanzen@gnunet.org> | 2022-10-20 17:31:48 +0900 |
---|---|---|
committer | Martin Schanzenbach <schanzen@gnunet.org> | 2022-10-20 17:31:48 +0900 |
commit | 01b5953cb3d3c7f072115ffa7b72884c3614cbae (patch) | |
tree | 9e024e600f962dc9910de4205d2852374e9e2d86 /src/zonemaster/gnunet-service-zonemaster.c | |
parent | 64aefd7b6fb27b8625af12783201f3c87da41f47 (diff) | |
download | gnunet-01b5953cb3d3c7f072115ffa7b72884c3614cbae.tar.gz gnunet-01b5953cb3d3c7f072115ffa7b72884c3614cbae.zip |
-fix properly emptying queue; add parallelization to monitor
Diffstat (limited to 'src/zonemaster/gnunet-service-zonemaster.c')
-rw-r--r-- | src/zonemaster/gnunet-service-zonemaster.c | 185 |
1 files changed, 68 insertions, 117 deletions
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c index 42b3abf91..fb55fd718 100644 --- a/src/zonemaster/gnunet-service-zonemaster.c +++ b/src/zonemaster/gnunet-service-zonemaster.c | |||
@@ -423,7 +423,8 @@ shutdown_task (void *cls) | |||
423 | } | 423 | } |
424 | while (NULL != (ma = ma_head)) | 424 | while (NULL != (ma = ma_head)) |
425 | { | 425 | { |
426 | GNUNET_DHT_put_cancel (ma->ph); | 426 | if (NULL != ma->ph) |
427 | GNUNET_DHT_put_cancel (ma->ph); | ||
427 | ma_queue_length--; | 428 | ma_queue_length--; |
428 | GNUNET_CONTAINER_DLL_remove (ma_head, | 429 | GNUNET_CONTAINER_DLL_remove (ma_head, |
429 | ma_tail, | 430 | ma_tail, |
@@ -818,36 +819,25 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key, | |||
818 | GNUNET_free (emsg); | 819 | GNUNET_free (emsg); |
819 | } | 820 | } |
820 | 821 | ||
821 | if (cache_keys) | 822 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key, |
822 | { | 823 | expire_pub, |
823 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create2 (key, | 824 | label, |
824 | expire_pub, | 825 | rd_public, |
825 | label, | 826 | rd_public_count, |
826 | rd_public, | 827 | &block)); |
827 | rd_public_count, | ||
828 | &block)); | ||
829 | } | ||
830 | else | ||
831 | { | ||
832 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key, | ||
833 | expire_pub, | ||
834 | label, | ||
835 | rd_public, | ||
836 | rd_public_count, | ||
837 | &block)); | ||
838 | } | ||
839 | if (NULL == block) | 828 | if (NULL == block) |
840 | { | 829 | { |
841 | GNUNET_break (0); | 830 | GNUNET_break (0); |
842 | return NULL; /* whoops */ | 831 | return NULL; /* whoops */ |
843 | } | 832 | } |
844 | if (rd_count != rd_public_count) | 833 | if (rd_count != rd_public_count) |
845 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key, | 834 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key, |
846 | expire, | 835 | expire, |
847 | label, | 836 | label, |
848 | rd, | 837 | rd, |
849 | rd_count, | 838 | rd_count, |
850 | &block_priv)); | 839 | & |
840 | block_priv)); | ||
851 | else | 841 | else |
852 | block_priv = block; | 842 | block_priv = block; |
853 | block_size = GNUNET_GNSRECORD_block_get_size (block); | 843 | block_size = GNUNET_GNSRECORD_block_get_size (block); |
@@ -879,22 +869,31 @@ initiate_put_from_pipe_trigger (void *cls) | |||
879 | { | 869 | { |
880 | struct GNUNET_HashCode query; | 870 | struct GNUNET_HashCode query; |
881 | struct OpenSignJob *job; | 871 | struct OpenSignJob *job; |
872 | const struct GNUNET_DISK_FileHandle *np_fh; | ||
873 | char buf[100]; | ||
874 | ssize_t nf_count; | ||
882 | 875 | ||
883 | pipe_read_task = NULL; | 876 | pipe_read_task = NULL; |
884 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); | 877 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); |
885 | job = results_head; | 878 | job = results_head; |
879 | np_fh = GNUNET_DISK_pipe_handle (notification_pipe, | ||
880 | GNUNET_DISK_PIPE_END_READ); | ||
881 | pipe_read_task = | ||
882 | GNUNET_SCHEDULER_add_read_file ( | ||
883 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
884 | np_fh, | ||
885 | notification_pipe_cb, | ||
886 | NULL); | ||
887 | /* empty queue */ | ||
888 | while (GNUNET_SYSERR != | ||
889 | (nf_count = GNUNET_DISK_file_read (np_fh, buf, sizeof (buf)))) | ||
890 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Read %lld notifications from pipe\n", | ||
891 | nf_count); | ||
886 | if (NULL == job) | 892 | if (NULL == job) |
887 | { | 893 | { |
894 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
895 | "Hmm... no results. Back to sleep.\n"); | ||
888 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | 896 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); |
889 | const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle ( | ||
890 | notification_pipe, | ||
891 | GNUNET_DISK_PIPE_END_READ); | ||
892 | pipe_read_task = | ||
893 | GNUNET_SCHEDULER_add_read_file ( | ||
894 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
895 | np_fh, | ||
896 | notification_pipe_cb, | ||
897 | NULL); | ||
898 | return; | 897 | return; |
899 | } | 898 | } |
900 | GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); | 899 | GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); |
@@ -1119,8 +1118,6 @@ dht_put_monitor_continuation (void *cls) | |||
1119 | { | 1118 | { |
1120 | struct DhtPutActivity *ma = cls; | 1119 | struct DhtPutActivity *ma = cls; |
1121 | 1120 | ||
1122 | GNUNET_NAMESTORE_zone_monitor_next (zmon, | ||
1123 | 1); | ||
1124 | ma_queue_length--; | 1121 | ma_queue_length--; |
1125 | GNUNET_CONTAINER_DLL_remove (ma_head, | 1122 | GNUNET_CONTAINER_DLL_remove (ma_head, |
1126 | ma_tail, | 1123 | ma_tail, |
@@ -1172,73 +1169,39 @@ perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, | |||
1172 | GNUNET_free (emsg); | 1169 | GNUNET_free (emsg); |
1173 | } | 1170 | } |
1174 | 1171 | ||
1175 | if (cache_keys) | 1172 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key, |
1176 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create2 (key, | 1173 | expire_pub, |
1177 | expire_pub, | 1174 | label, |
1178 | label, | 1175 | rd_public, |
1179 | rd_public, | 1176 | rd_public_count, |
1180 | rd_public_count, | 1177 | &block)); |
1181 | &block)); | ||
1182 | else | ||
1183 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key, | ||
1184 | expire_pub, | ||
1185 | label, | ||
1186 | rd_public, | ||
1187 | rd_public_count, | ||
1188 | &block)); | ||
1189 | if (NULL == block) | 1178 | if (NULL == block) |
1190 | { | 1179 | { |
1191 | GNUNET_break (0); | 1180 | GNUNET_break (0); |
1192 | return NULL; /* whoops */ | 1181 | return NULL; /* whoops */ |
1193 | } | 1182 | } |
1194 | if (rd_count != rd_public_count) | 1183 | if (rd_count != rd_public_count) |
1195 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key, | 1184 | GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key, |
1196 | expire, | 1185 | expire, |
1197 | label, | 1186 | label, |
1198 | rd, | 1187 | rd, |
1199 | rd_count, | 1188 | rd_count, |
1200 | &block_priv)); | 1189 | & |
1190 | block_priv)); | ||
1201 | else | 1191 | else |
1202 | block_priv = block; | 1192 | block_priv = block; |
1203 | block_size = GNUNET_GNSRECORD_block_get_size (block); | 1193 | block_size = GNUNET_GNSRECORD_block_get_size (block); |
1204 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); | 1194 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); |
1205 | struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); | 1195 | struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); |
1206 | job->block = GNUNET_malloc (block_size); // FIXME this does not need to be copied, can be freed by worker | 1196 | job->block = block; |
1207 | memcpy (job->block, block, block_size); | 1197 | job->block_size = block_size; |
1198 | job->block_priv = block_priv; | ||
1208 | job->zone = *key; | 1199 | job->zone = *key; |
1200 | job->ma = ma; | ||
1209 | job->label = GNUNET_strdup (label); | 1201 | job->label = GNUNET_strdup (label); |
1202 | job->expire_pub = expire_pub; | ||
1210 | GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); | 1203 | GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); |
1211 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | 1204 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); |
1212 | GNUNET_GNSRECORD_query_from_private_key (key, | ||
1213 | label, | ||
1214 | &query); | ||
1215 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | ||
1216 | GNUNET_STATISTICS_update (statistics, | ||
1217 | "DHT put operations initiated", | ||
1218 | 1, | ||
1219 | GNUNET_NO); | ||
1220 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1221 | "Storing %u public of %u record(s) for label `%s' in DHT with expiration `%s' under key %s\n", | ||
1222 | rd_public_count, | ||
1223 | rd_count, | ||
1224 | label, | ||
1225 | GNUNET_STRINGS_absolute_time_to_string (expire), | ||
1226 | GNUNET_h2s (&query)); | ||
1227 | ret = GNUNET_DHT_put (dht_handle, | ||
1228 | &query, | ||
1229 | DHT_GNS_REPLICATION_LEVEL, | ||
1230 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, | ||
1231 | GNUNET_BLOCK_TYPE_GNS_NAMERECORD, | ||
1232 | block_size, | ||
1233 | block, | ||
1234 | expire_pub, | ||
1235 | &dht_put_monitor_continuation, | ||
1236 | ma); | ||
1237 | refresh_block (block_priv); | ||
1238 | if (block != block_priv) | ||
1239 | GNUNET_free (block_priv); | ||
1240 | GNUNET_free (block); | ||
1241 | return ret; | ||
1242 | } | 1205 | } |
1243 | 1206 | ||
1244 | /** | 1207 | /** |
@@ -1277,41 +1240,26 @@ handle_monitor_event (void *cls, | |||
1277 | 1); | 1240 | 1); |
1278 | return; /* nothing to do */ | 1241 | return; /* nothing to do */ |
1279 | } | 1242 | } |
1280 | ma = GNUNET_new (struct DhtPutActivity); | 1243 | if (dht_queue_length >= DHT_QUEUE_LIMIT) |
1281 | ma->start_date = GNUNET_TIME_absolute_get (); | ||
1282 | ma->ph = perform_dht_put_monitor (zone, | ||
1283 | label, | ||
1284 | rd, | ||
1285 | rd_count, | ||
1286 | expire, | ||
1287 | ma); | ||
1288 | if (NULL == ma->ph) | ||
1289 | { | 1244 | { |
1290 | /* PUT failed, do not remember operation */ | 1245 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1291 | GNUNET_free (ma); | 1246 | "DHT PUT queue length exceeded (%u), aborting PUT\n", |
1292 | GNUNET_NAMESTORE_zone_monitor_next (zmon, | 1247 | DHT_QUEUE_LIMIT); |
1293 | 1); | ||
1294 | return; | 1248 | return; |
1295 | } | 1249 | } |
1250 | ma = GNUNET_new (struct DhtPutActivity); | ||
1251 | perform_dht_put_monitor (zone, | ||
1252 | label, | ||
1253 | rd, | ||
1254 | rd_count, | ||
1255 | expire, | ||
1256 | ma); | ||
1257 | GNUNET_NAMESTORE_zone_monitor_next (zmon, | ||
1258 | 1); | ||
1296 | GNUNET_CONTAINER_DLL_insert_tail (ma_head, | 1259 | GNUNET_CONTAINER_DLL_insert_tail (ma_head, |
1297 | ma_tail, | 1260 | ma_tail, |
1298 | ma); | 1261 | ma); |
1299 | ma_queue_length++; | 1262 | ma_queue_length++; |
1300 | if (ma_queue_length > DHT_QUEUE_LIMIT) | ||
1301 | { | ||
1302 | ma = ma_head; | ||
1303 | GNUNET_CONTAINER_DLL_remove (ma_head, | ||
1304 | ma_tail, | ||
1305 | ma); | ||
1306 | GNUNET_DHT_put_cancel (ma->ph); | ||
1307 | ma_queue_length--; | ||
1308 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1309 | "DHT PUT unconfirmed after %s, aborting PUT\n", | ||
1310 | GNUNET_STRINGS_relative_time_to_string ( | ||
1311 | GNUNET_TIME_absolute_get_duration (ma->start_date), | ||
1312 | GNUNET_YES)); | ||
1313 | GNUNET_free (ma); | ||
1314 | } | ||
1315 | } | 1263 | } |
1316 | 1264 | ||
1317 | 1265 | ||
@@ -1351,12 +1299,15 @@ sign_worker (void *) | |||
1351 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | 1299 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); |
1352 | if (NULL != job) | 1300 | if (NULL != job) |
1353 | { | 1301 | { |
1302 | GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block); | ||
1303 | if (job->block != job->block_priv) | ||
1304 | GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv); | ||
1354 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); | 1305 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); |
1355 | GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job); | 1306 | GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job); |
1356 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | 1307 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); |
1357 | job = NULL; | 1308 | job = NULL; |
1358 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1309 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1359 | "Done, notifying main thread throug pipe!\n"); | 1310 | "Done, notifying main thread through pipe!\n"); |
1360 | GNUNET_DISK_file_write (fh, "!", 1); | 1311 | GNUNET_DISK_file_write (fh, "!", 1); |
1361 | } | 1312 | } |
1362 | else { | 1313 | else { |