diff options
author | Martin Schanzenbach <schanzen@gnunet.org> | 2022-10-20 17:01:48 +0900 |
---|---|---|
committer | Martin Schanzenbach <schanzen@gnunet.org> | 2022-10-20 17:01:48 +0900 |
commit | 64aefd7b6fb27b8625af12783201f3c87da41f47 (patch) | |
tree | 77b582d595b498909df446b3dd7ab326053b9a36 /src/zonemaster/gnunet-service-zonemaster.c | |
parent | 3d7d23db1764973179fe9fc0013b942692c47df5 (diff) | |
download | gnunet-64aefd7b6fb27b8625af12783201f3c87da41f47.tar.gz gnunet-64aefd7b6fb27b8625af12783201f3c87da41f47.zip |
ZONEMASTER: Use parallel worker thread for GNS block signing
Diffstat (limited to 'src/zonemaster/gnunet-service-zonemaster.c')
-rw-r--r-- | src/zonemaster/gnunet-service-zonemaster.c | 311 |
1 files changed, 261 insertions, 50 deletions
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c index f5c1d781b..42b3abf91 100644 --- a/src/zonemaster/gnunet-service-zonemaster.c +++ b/src/zonemaster/gnunet-service-zonemaster.c | |||
@@ -97,6 +97,81 @@ | |||
97 | #define DHT_GNS_REPLICATION_LEVEL 5 | 97 | #define DHT_GNS_REPLICATION_LEVEL 5 |
98 | 98 | ||
99 | /** | 99 | /** |
100 | * Our workers | ||
101 | */ | ||
102 | static pthread_t * worker; | ||
103 | |||
104 | /** | ||
105 | * Lock for the open jobs queue. | ||
106 | */ | ||
107 | static pthread_mutex_t jobs_lock; | ||
108 | |||
109 | /** | ||
110 | * Lock for the finished results queue. | ||
111 | */ | ||
112 | static pthread_mutex_t results_lock; | ||
113 | |||
114 | /** | ||
115 | * For threads to know we are shutting down | ||
116 | */ | ||
117 | static int in_shutdown = GNUNET_NO; | ||
118 | |||
119 | /** | ||
120 | * Our notification pipe | ||
121 | */ | ||
122 | static struct GNUNET_DISK_PipeHandle *notification_pipe; | ||
123 | |||
124 | /** | ||
125 | * Pipe read task | ||
126 | */ | ||
127 | static struct GNUNET_SCHEDULER_Task *pipe_read_task; | ||
128 | |||
129 | struct OpenSignJob | ||
130 | { | ||
131 | |||
132 | struct OpenSignJob *next; | ||
133 | |||
134 | struct OpenSignJob *prev; | ||
135 | |||
136 | struct GNUNET_IDENTITY_PrivateKey zone; | ||
137 | |||
138 | struct GNUNET_GNSRECORD_Block *block; | ||
139 | |||
140 | struct GNUNET_GNSRECORD_Block *block_priv; | ||
141 | |||
142 | struct DhtPutActivity *ma; | ||
143 | |||
144 | size_t block_size; | ||
145 | |||
146 | struct GNUNET_TIME_Absolute expire_pub; | ||
147 | |||
148 | char *label; | ||
149 | |||
150 | }; | ||
151 | |||
152 | |||
153 | /** | ||
154 | * DLL | ||
155 | */ | ||
156 | static struct OpenSignJob *jobs_head; | ||
157 | |||
158 | /** | ||
159 | * DLL | ||
160 | */ | ||
161 | static struct OpenSignJob *jobs_tail; | ||
162 | |||
163 | /** | ||
164 | * DLL | ||
165 | */ | ||
166 | static struct OpenSignJob *results_head; | ||
167 | |||
168 | /** | ||
169 | * DLL | ||
170 | */ | ||
171 | static struct OpenSignJob *results_tail; | ||
172 | |||
173 | |||
174 | /** | ||
100 | * Handle for DHT PUT activity triggered from the namestore monitor. | 175 | * Handle for DHT PUT activity triggered from the namestore monitor. |
101 | */ | 176 | */ |
102 | struct DhtPutActivity | 177 | struct DhtPutActivity |
@@ -319,8 +394,13 @@ shutdown_task (void *cls) | |||
319 | struct CacheOperation *cop; | 394 | struct CacheOperation *cop; |
320 | 395 | ||
321 | (void) cls; | 396 | (void) cls; |
397 | in_shutdown == GNUNET_YES; | ||
322 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 398 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
323 | "Shutting down!\n"); | 399 | "Shutting down!\n"); |
400 | if (NULL != notification_pipe) | ||
401 | GNUNET_DISK_pipe_close (notification_pipe); | ||
402 | if (NULL != pipe_read_task) | ||
403 | GNUNET_SCHEDULER_cancel (pipe_read_task); | ||
324 | while (NULL != (cop = cop_head)) | 404 | while (NULL != (cop = cop_head)) |
325 | { | 405 | { |
326 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 406 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
@@ -332,7 +412,8 @@ shutdown_task (void *cls) | |||
332 | 412 | ||
333 | while (NULL != (ma = it_head)) | 413 | while (NULL != (ma = it_head)) |
334 | { | 414 | { |
335 | GNUNET_DHT_put_cancel (ma->ph); | 415 | if (NULL != ma->ph) |
416 | GNUNET_DHT_put_cancel (ma->ph); | ||
336 | dht_queue_length--; | 417 | dht_queue_length--; |
337 | GNUNET_CONTAINER_DLL_remove (it_head, | 418 | GNUNET_CONTAINER_DLL_remove (it_head, |
338 | it_tail, | 419 | it_tail, |
@@ -682,6 +763,16 @@ dht_put_continuation (void *cls) | |||
682 | GNUNET_free (ma); | 763 | GNUNET_free (ma); |
683 | } | 764 | } |
684 | 765 | ||
766 | static void | ||
767 | free_job (struct OpenSignJob *job) | ||
768 | { | ||
769 | if (job->block != job->block_priv) | ||
770 | GNUNET_free (job->block_priv); | ||
771 | GNUNET_free (job->block); | ||
772 | if (NULL != job->label) | ||
773 | GNUNET_free (job->label); | ||
774 | GNUNET_free (job); | ||
775 | } | ||
685 | 776 | ||
686 | 777 | ||
687 | /** | 778 | /** |
@@ -760,35 +851,86 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key, | |||
760 | else | 851 | else |
761 | block_priv = block; | 852 | block_priv = block; |
762 | block_size = GNUNET_GNSRECORD_block_get_size (block); | 853 | block_size = GNUNET_GNSRECORD_block_get_size (block); |
763 | GNUNET_GNSRECORD_query_from_private_key (key, | 854 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); |
764 | label, | 855 | struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); |
856 | job->block = GNUNET_malloc (block_size); // FIXME this does not need to be copied, can be freed by worker | ||
857 | memcpy (job->block, block, block_size); | ||
858 | job->block_size = block_size; | ||
859 | job->block_priv = block_priv; | ||
860 | job->zone = *key; | ||
861 | job->ma = ma; | ||
862 | job->label = GNUNET_strdup (label); | ||
863 | job->expire_pub = expire_pub; | ||
864 | GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); | ||
865 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | ||
866 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
867 | "Storing %u record(s) for label `%s' in DHT with expiration `%s'\n", | ||
868 | rd_public_count, | ||
869 | label, | ||
870 | GNUNET_STRINGS_absolute_time_to_string (expire)); | ||
871 | num_public_records++; | ||
872 | } | ||
873 | |||
874 | static void | ||
875 | notification_pipe_cb (void *cls); | ||
876 | |||
877 | static void | ||
878 | initiate_put_from_pipe_trigger (void *cls) | ||
879 | { | ||
880 | struct GNUNET_HashCode query; | ||
881 | struct OpenSignJob *job; | ||
882 | |||
883 | pipe_read_task = NULL; | ||
884 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); | ||
885 | job = results_head; | ||
886 | if (NULL == job) | ||
887 | { | ||
888 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | ||
889 | const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle ( | ||
890 | notification_pipe, | ||
891 | GNUNET_DISK_PIPE_END_READ); | ||
892 | pipe_read_task = | ||
893 | GNUNET_SCHEDULER_add_read_file ( | ||
894 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
895 | np_fh, | ||
896 | notification_pipe_cb, | ||
897 | NULL); | ||
898 | return; | ||
899 | } | ||
900 | GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); | ||
901 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | ||
902 | GNUNET_GNSRECORD_query_from_private_key (&job->zone, | ||
903 | job->label, | ||
765 | &query); | 904 | &query); |
766 | GNUNET_STATISTICS_update (statistics, | 905 | GNUNET_STATISTICS_update (statistics, |
767 | "DHT put operations initiated", | 906 | "DHT put operations initiated", |
768 | 1, | 907 | 1, |
769 | GNUNET_NO); | 908 | GNUNET_NO); |
770 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 909 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
771 | "Storing %u record(s) for label `%s' in DHT with expiration `%s' under key %s\n", | 910 | "Storing record(s) for label `%s' in DHT under key %s\n", |
772 | rd_public_count, | 911 | job->label, |
773 | label, | ||
774 | GNUNET_STRINGS_absolute_time_to_string (expire), | ||
775 | GNUNET_h2s (&query)); | 912 | GNUNET_h2s (&query)); |
776 | num_public_records++; | 913 | job->ma->ph = GNUNET_DHT_put (dht_handle, |
777 | ret = GNUNET_DHT_put (dht_handle, | 914 | &query, |
778 | &query, | 915 | DHT_GNS_REPLICATION_LEVEL, |
779 | DHT_GNS_REPLICATION_LEVEL, | 916 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, |
780 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, | 917 | GNUNET_BLOCK_TYPE_GNS_NAMERECORD, |
781 | GNUNET_BLOCK_TYPE_GNS_NAMERECORD, | 918 | job->block_size, |
782 | block_size, | 919 | job->block, |
783 | block, | 920 | job->expire_pub, |
784 | expire_pub, | 921 | &dht_put_continuation, |
785 | &dht_put_continuation, | 922 | job->ma); |
786 | ma); | 923 | if (NULL == job->ma->ph) |
787 | refresh_block (block_priv); | 924 | { |
788 | if (block != block_priv) | 925 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
789 | GNUNET_free (block_priv); | 926 | "Could not perform DHT PUT, is the DHT running?\n"); |
790 | GNUNET_free (block); | 927 | GNUNET_free (job->ma); |
791 | return ret; | 928 | free_job (job); |
929 | return; | ||
930 | } | ||
931 | refresh_block (job->block_priv); | ||
932 | free_job (job); | ||
933 | return; | ||
792 | } | 934 | } |
793 | 935 | ||
794 | 936 | ||
@@ -907,45 +1049,29 @@ put_gns_record (void *cls, | |||
907 | /* We got a set of records to publish */ | 1049 | /* We got a set of records to publish */ |
908 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1050 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
909 | "Starting DHT PUT\n"); | 1051 | "Starting DHT PUT\n"); |
910 | |||
911 | ma = GNUNET_new (struct DhtPutActivity); | ||
912 | ma->start_date = GNUNET_TIME_absolute_get (); | ||
913 | ma->ph = perform_dht_put (key, | ||
914 | label, | ||
915 | rd, | ||
916 | rd_count, | ||
917 | expire, | ||
918 | ma); | ||
919 | put_cnt++; | 1052 | put_cnt++; |
920 | if (0 == put_cnt % DELTA_INTERVAL) | 1053 | if (0 == put_cnt % DELTA_INTERVAL) |
921 | update_velocity (DELTA_INTERVAL); | 1054 | update_velocity (DELTA_INTERVAL); |
922 | check_zone_namestore_next (); | 1055 | check_zone_namestore_next (); |
923 | if (NULL == ma->ph) | 1056 | if (dht_queue_length >= DHT_QUEUE_LIMIT) |
924 | { | 1057 | { |
925 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1058 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
926 | "Could not perform DHT PUT, is the DHT running?\n"); | 1059 | "DHT PUT queue length exceeded (%u), aborting PUT\n", |
927 | GNUNET_free (ma); | 1060 | DHT_QUEUE_LIMIT); |
928 | return; | 1061 | return; |
929 | } | 1062 | } |
1063 | |||
1064 | ma = GNUNET_new (struct DhtPutActivity); | ||
1065 | perform_dht_put (key, | ||
1066 | label, | ||
1067 | rd, | ||
1068 | rd_count, | ||
1069 | expire, | ||
1070 | ma); | ||
930 | dht_queue_length++; | 1071 | dht_queue_length++; |
931 | GNUNET_CONTAINER_DLL_insert_tail (it_head, | 1072 | GNUNET_CONTAINER_DLL_insert_tail (it_head, |
932 | it_tail, | 1073 | it_tail, |
933 | ma); | 1074 | ma); |
934 | if (dht_queue_length > DHT_QUEUE_LIMIT) | ||
935 | { | ||
936 | ma = it_head; | ||
937 | GNUNET_CONTAINER_DLL_remove (it_head, | ||
938 | it_tail, | ||
939 | ma); | ||
940 | GNUNET_DHT_put_cancel (ma->ph); | ||
941 | dht_queue_length--; | ||
942 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
943 | "DHT PUT unconfirmed after %s, aborting PUT\n", | ||
944 | GNUNET_STRINGS_relative_time_to_string ( | ||
945 | GNUNET_TIME_absolute_get_duration (ma->start_date), | ||
946 | GNUNET_YES)); | ||
947 | GNUNET_free (ma); | ||
948 | } | ||
949 | } | 1075 | } |
950 | 1076 | ||
951 | /** | 1077 | /** |
@@ -1075,9 +1201,18 @@ perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, | |||
1075 | else | 1201 | else |
1076 | block_priv = block; | 1202 | block_priv = block; |
1077 | block_size = GNUNET_GNSRECORD_block_get_size (block); | 1203 | block_size = GNUNET_GNSRECORD_block_get_size (block); |
1204 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); | ||
1205 | struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); | ||
1206 | job->block = GNUNET_malloc (block_size); // FIXME this does not need to be copied, can be freed by worker | ||
1207 | memcpy (job->block, block, block_size); | ||
1208 | job->zone = *key; | ||
1209 | job->label = GNUNET_strdup (label); | ||
1210 | GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); | ||
1211 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | ||
1078 | GNUNET_GNSRECORD_query_from_private_key (key, | 1212 | GNUNET_GNSRECORD_query_from_private_key (key, |
1079 | label, | 1213 | label, |
1080 | &query); | 1214 | &query); |
1215 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | ||
1081 | GNUNET_STATISTICS_update (statistics, | 1216 | GNUNET_STATISTICS_update (statistics, |
1082 | "DHT put operations initiated", | 1217 | "DHT put operations initiated", |
1083 | 1, | 1218 | 1, |
@@ -1196,6 +1331,48 @@ handle_monitor_error (void *cls) | |||
1196 | GNUNET_NO); | 1331 | GNUNET_NO); |
1197 | } | 1332 | } |
1198 | 1333 | ||
1334 | static void* | ||
1335 | sign_worker (void *) | ||
1336 | { | ||
1337 | struct OpenSignJob *job; | ||
1338 | const struct GNUNET_DISK_FileHandle *fh; | ||
1339 | |||
1340 | fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE); | ||
1341 | while (GNUNET_YES != in_shutdown) | ||
1342 | { | ||
1343 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); | ||
1344 | if (NULL != jobs_head) | ||
1345 | { | ||
1346 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1347 | "Taking on Job for %s\n", jobs_head->label); | ||
1348 | job = jobs_head; | ||
1349 | GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); | ||
1350 | } | ||
1351 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | ||
1352 | if (NULL != job) | ||
1353 | { | ||
1354 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); | ||
1355 | GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job); | ||
1356 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | ||
1357 | job = NULL; | ||
1358 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1359 | "Done, notifying main thread throug pipe!\n"); | ||
1360 | GNUNET_DISK_file_write (fh, "!", 1); | ||
1361 | } | ||
1362 | else { | ||
1363 | sleep (1); | ||
1364 | } | ||
1365 | } | ||
1366 | return NULL; | ||
1367 | } | ||
1368 | |||
1369 | static void | ||
1370 | notification_pipe_cb (void *cls) | ||
1371 | { | ||
1372 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1373 | "Received wake up notification through pipe, checking results\n"); | ||
1374 | GNUNET_SCHEDULER_add_now (&initiate_put_from_pipe_trigger, NULL); | ||
1375 | } | ||
1199 | 1376 | ||
1200 | /** | 1377 | /** |
1201 | * Perform zonemaster duties: watch namestore, publish records. | 1378 | * Perform zonemaster duties: watch namestore, publish records. |
@@ -1305,6 +1482,40 @@ run (void *cls, | |||
1305 | 1482 | ||
1306 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, | 1483 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, |
1307 | NULL); | 1484 | NULL); |
1485 | |||
1486 | notification_pipe = GNUNET_DISK_pipe (GNUNET_DISK_PF_NONE); | ||
1487 | const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle ( | ||
1488 | notification_pipe, | ||
1489 | GNUNET_DISK_PIPE_END_READ); | ||
1490 | pipe_read_task = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, | ||
1491 | np_fh, | ||
1492 | notification_pipe_cb, NULL); | ||
1493 | |||
1494 | long long unsigned int worker_count = 1; | ||
1495 | if (GNUNET_OK != | ||
1496 | GNUNET_CONFIGURATION_get_value_number (c, | ||
1497 | "zonemaster", | ||
1498 | "WORKER_COUNT", | ||
1499 | &worker_count)) | ||
1500 | { | ||
1501 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1502 | "Number of workers not defined falling back to 1\n"); | ||
1503 | } | ||
1504 | worker = GNUNET_malloc (sizeof (pthread_t) * worker_count); | ||
1505 | /** Start worker */ | ||
1506 | for (int i = 0; i < worker_count; i++) | ||
1507 | { | ||
1508 | if (0 != | ||
1509 | pthread_create (&worker[i], | ||
1510 | NULL, | ||
1511 | &sign_worker, | ||
1512 | NULL)) | ||
1513 | { | ||
1514 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, | ||
1515 | "pthread_create"); | ||
1516 | GNUNET_SCHEDULER_shutdown (); | ||
1517 | } | ||
1518 | } | ||
1308 | } | 1519 | } |
1309 | 1520 | ||
1310 | 1521 | ||