aboutsummaryrefslogtreecommitdiff
path: root/src/zonemaster/gnunet-service-zonemaster.c
diff options
context:
space:
mode:
authorMartin Schanzenbach <schanzen@gnunet.org>2022-10-20 17:31:48 +0900
committerMartin Schanzenbach <schanzen@gnunet.org>2022-10-20 17:31:48 +0900
commit01b5953cb3d3c7f072115ffa7b72884c3614cbae (patch)
tree9e024e600f962dc9910de4205d2852374e9e2d86 /src/zonemaster/gnunet-service-zonemaster.c
parent64aefd7b6fb27b8625af12783201f3c87da41f47 (diff)
downloadgnunet-01b5953cb3d3c7f072115ffa7b72884c3614cbae.tar.gz
gnunet-01b5953cb3d3c7f072115ffa7b72884c3614cbae.zip
-fix properly emptying queue; add parallelization to monitor
Diffstat (limited to 'src/zonemaster/gnunet-service-zonemaster.c')
-rw-r--r--src/zonemaster/gnunet-service-zonemaster.c185
1 files changed, 68 insertions, 117 deletions
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c
index 42b3abf91..fb55fd718 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -423,7 +423,8 @@ shutdown_task (void *cls)
423 } 423 }
424 while (NULL != (ma = ma_head)) 424 while (NULL != (ma = ma_head))
425 { 425 {
426 GNUNET_DHT_put_cancel (ma->ph); 426 if (NULL != ma->ph)
427 GNUNET_DHT_put_cancel (ma->ph);
427 ma_queue_length--; 428 ma_queue_length--;
428 GNUNET_CONTAINER_DLL_remove (ma_head, 429 GNUNET_CONTAINER_DLL_remove (ma_head,
429 ma_tail, 430 ma_tail,
@@ -818,36 +819,25 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key,
818 GNUNET_free (emsg); 819 GNUNET_free (emsg);
819 } 820 }
820 821
821 if (cache_keys) 822 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
822 { 823 expire_pub,
823 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create2 (key, 824 label,
824 expire_pub, 825 rd_public,
825 label, 826 rd_public_count,
826 rd_public, 827 &block));
827 rd_public_count,
828 &block));
829 }
830 else
831 {
832 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key,
833 expire_pub,
834 label,
835 rd_public,
836 rd_public_count,
837 &block));
838 }
839 if (NULL == block) 828 if (NULL == block)
840 { 829 {
841 GNUNET_break (0); 830 GNUNET_break (0);
842 return NULL; /* whoops */ 831 return NULL; /* whoops */
843 } 832 }
844 if (rd_count != rd_public_count) 833 if (rd_count != rd_public_count)
845 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key, 834 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
846 expire, 835 expire,
847 label, 836 label,
848 rd, 837 rd,
849 rd_count, 838 rd_count,
850 &block_priv)); 839 &
840 block_priv));
851 else 841 else
852 block_priv = block; 842 block_priv = block;
853 block_size = GNUNET_GNSRECORD_block_get_size (block); 843 block_size = GNUNET_GNSRECORD_block_get_size (block);
@@ -879,22 +869,31 @@ initiate_put_from_pipe_trigger (void *cls)
879{ 869{
880 struct GNUNET_HashCode query; 870 struct GNUNET_HashCode query;
881 struct OpenSignJob *job; 871 struct OpenSignJob *job;
872 const struct GNUNET_DISK_FileHandle *np_fh;
873 char buf[100];
874 ssize_t nf_count;
882 875
883 pipe_read_task = NULL; 876 pipe_read_task = NULL;
884 GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); 877 GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
885 job = results_head; 878 job = results_head;
879 np_fh = GNUNET_DISK_pipe_handle (notification_pipe,
880 GNUNET_DISK_PIPE_END_READ);
881 pipe_read_task =
882 GNUNET_SCHEDULER_add_read_file (
883 GNUNET_TIME_UNIT_FOREVER_REL,
884 np_fh,
885 notification_pipe_cb,
886 NULL);
887 /* empty queue */
888 while (GNUNET_SYSERR !=
889 (nf_count = GNUNET_DISK_file_read (np_fh, buf, sizeof (buf))))
890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Read %lld notifications from pipe\n",
891 nf_count);
886 if (NULL == job) 892 if (NULL == job)
887 { 893 {
894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895 "Hmm... no results. Back to sleep.\n");
888 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); 896 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
889 const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
890 notification_pipe,
891 GNUNET_DISK_PIPE_END_READ);
892 pipe_read_task =
893 GNUNET_SCHEDULER_add_read_file (
894 GNUNET_TIME_UNIT_FOREVER_REL,
895 np_fh,
896 notification_pipe_cb,
897 NULL);
898 return; 897 return;
899 } 898 }
900 GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); 899 GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job);
@@ -1119,8 +1118,6 @@ dht_put_monitor_continuation (void *cls)
1119{ 1118{
1120 struct DhtPutActivity *ma = cls; 1119 struct DhtPutActivity *ma = cls;
1121 1120
1122 GNUNET_NAMESTORE_zone_monitor_next (zmon,
1123 1);
1124 ma_queue_length--; 1121 ma_queue_length--;
1125 GNUNET_CONTAINER_DLL_remove (ma_head, 1122 GNUNET_CONTAINER_DLL_remove (ma_head,
1126 ma_tail, 1123 ma_tail,
@@ -1172,73 +1169,39 @@ perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
1172 GNUNET_free (emsg); 1169 GNUNET_free (emsg);
1173 } 1170 }
1174 1171
1175 if (cache_keys) 1172 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
1176 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create2 (key, 1173 expire_pub,
1177 expire_pub, 1174 label,
1178 label, 1175 rd_public,
1179 rd_public, 1176 rd_public_count,
1180 rd_public_count, 1177 &block));
1181 &block));
1182 else
1183 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key,
1184 expire_pub,
1185 label,
1186 rd_public,
1187 rd_public_count,
1188 &block));
1189 if (NULL == block) 1178 if (NULL == block)
1190 { 1179 {
1191 GNUNET_break (0); 1180 GNUNET_break (0);
1192 return NULL; /* whoops */ 1181 return NULL; /* whoops */
1193 } 1182 }
1194 if (rd_count != rd_public_count) 1183 if (rd_count != rd_public_count)
1195 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key, 1184 GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
1196 expire, 1185 expire,
1197 label, 1186 label,
1198 rd, 1187 rd,
1199 rd_count, 1188 rd_count,
1200 &block_priv)); 1189 &
1190 block_priv));
1201 else 1191 else
1202 block_priv = block; 1192 block_priv = block;
1203 block_size = GNUNET_GNSRECORD_block_get_size (block); 1193 block_size = GNUNET_GNSRECORD_block_get_size (block);
1204 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); 1194 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
1205 struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); 1195 struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
1206 job->block = GNUNET_malloc (block_size); // FIXME this does not need to be copied, can be freed by worker 1196 job->block = block;
1207 memcpy (job->block, block, block_size); 1197 job->block_size = block_size;
1198 job->block_priv = block_priv;
1208 job->zone = *key; 1199 job->zone = *key;
1200 job->ma = ma;
1209 job->label = GNUNET_strdup (label); 1201 job->label = GNUNET_strdup (label);
1202 job->expire_pub = expire_pub;
1210 GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); 1203 GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
1211 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); 1204 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
1212 GNUNET_GNSRECORD_query_from_private_key (key,
1213 label,
1214 &query);
1215 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
1216 GNUNET_STATISTICS_update (statistics,
1217 "DHT put operations initiated",
1218 1,
1219 GNUNET_NO);
1220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221 "Storing %u public of %u record(s) for label `%s' in DHT with expiration `%s' under key %s\n",
1222 rd_public_count,
1223 rd_count,
1224 label,
1225 GNUNET_STRINGS_absolute_time_to_string (expire),
1226 GNUNET_h2s (&query));
1227 ret = GNUNET_DHT_put (dht_handle,
1228 &query,
1229 DHT_GNS_REPLICATION_LEVEL,
1230 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1231 GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
1232 block_size,
1233 block,
1234 expire_pub,
1235 &dht_put_monitor_continuation,
1236 ma);
1237 refresh_block (block_priv);
1238 if (block != block_priv)
1239 GNUNET_free (block_priv);
1240 GNUNET_free (block);
1241 return ret;
1242} 1205}
1243 1206
1244/** 1207/**
@@ -1277,41 +1240,26 @@ handle_monitor_event (void *cls,
1277 1); 1240 1);
1278 return; /* nothing to do */ 1241 return; /* nothing to do */
1279 } 1242 }
1280 ma = GNUNET_new (struct DhtPutActivity); 1243 if (dht_queue_length >= DHT_QUEUE_LIMIT)
1281 ma->start_date = GNUNET_TIME_absolute_get ();
1282 ma->ph = perform_dht_put_monitor (zone,
1283 label,
1284 rd,
1285 rd_count,
1286 expire,
1287 ma);
1288 if (NULL == ma->ph)
1289 { 1244 {
1290 /* PUT failed, do not remember operation */ 1245 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1291 GNUNET_free (ma); 1246 "DHT PUT queue length exceeded (%u), aborting PUT\n",
1292 GNUNET_NAMESTORE_zone_monitor_next (zmon, 1247 DHT_QUEUE_LIMIT);
1293 1);
1294 return; 1248 return;
1295 } 1249 }
1250 ma = GNUNET_new (struct DhtPutActivity);
1251 perform_dht_put_monitor (zone,
1252 label,
1253 rd,
1254 rd_count,
1255 expire,
1256 ma);
1257 GNUNET_NAMESTORE_zone_monitor_next (zmon,
1258 1);
1296 GNUNET_CONTAINER_DLL_insert_tail (ma_head, 1259 GNUNET_CONTAINER_DLL_insert_tail (ma_head,
1297 ma_tail, 1260 ma_tail,
1298 ma); 1261 ma);
1299 ma_queue_length++; 1262 ma_queue_length++;
1300 if (ma_queue_length > DHT_QUEUE_LIMIT)
1301 {
1302 ma = ma_head;
1303 GNUNET_CONTAINER_DLL_remove (ma_head,
1304 ma_tail,
1305 ma);
1306 GNUNET_DHT_put_cancel (ma->ph);
1307 ma_queue_length--;
1308 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1309 "DHT PUT unconfirmed after %s, aborting PUT\n",
1310 GNUNET_STRINGS_relative_time_to_string (
1311 GNUNET_TIME_absolute_get_duration (ma->start_date),
1312 GNUNET_YES));
1313 GNUNET_free (ma);
1314 }
1315} 1263}
1316 1264
1317 1265
@@ -1351,12 +1299,15 @@ sign_worker (void *)
1351 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); 1299 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
1352 if (NULL != job) 1300 if (NULL != job)
1353 { 1301 {
1302 GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block);
1303 if (job->block != job->block_priv)
1304 GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv);
1354 GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); 1305 GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
1355 GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job); 1306 GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job);
1356 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); 1307 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
1357 job = NULL; 1308 job = NULL;
1358 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1359 "Done, notifying main thread throug pipe!\n"); 1310 "Done, notifying main thread through pipe!\n");
1360 GNUNET_DISK_file_write (fh, "!", 1); 1311 GNUNET_DISK_file_write (fh, "!", 1);
1361 } 1312 }
1362 else { 1313 else {