aboutsummaryrefslogtreecommitdiff
path: root/src/zonemaster/gnunet-service-zonemaster.c
diff options
context:
space:
mode:
authorMartin Schanzenbach <schanzen@gnunet.org>2022-10-20 17:01:48 +0900
committerMartin Schanzenbach <schanzen@gnunet.org>2022-10-20 17:01:48 +0900
commit64aefd7b6fb27b8625af12783201f3c87da41f47 (patch)
tree77b582d595b498909df446b3dd7ab326053b9a36 /src/zonemaster/gnunet-service-zonemaster.c
parent3d7d23db1764973179fe9fc0013b942692c47df5 (diff)
downloadgnunet-64aefd7b6fb27b8625af12783201f3c87da41f47.tar.gz
gnunet-64aefd7b6fb27b8625af12783201f3c87da41f47.zip
ZONEMASTER: Use parallel worker thread for GNS block signing
Diffstat (limited to 'src/zonemaster/gnunet-service-zonemaster.c')
-rw-r--r--src/zonemaster/gnunet-service-zonemaster.c311
1 files changed, 261 insertions, 50 deletions
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c
index f5c1d781b..42b3abf91 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -97,6 +97,81 @@
97#define DHT_GNS_REPLICATION_LEVEL 5 97#define DHT_GNS_REPLICATION_LEVEL 5
98 98
99/** 99/**
100 * Our workers
101 */
102static pthread_t * worker;
103
104/**
105 * Lock for the open jobs queue.
106 */
107static pthread_mutex_t jobs_lock;
108
109/**
110 * Lock for the finished results queue.
111 */
112static pthread_mutex_t results_lock;
113
114/**
115 * For threads to know we are shutting down
116 */
117static int in_shutdown = GNUNET_NO;
118
119/**
120 * Our notification pipe
121 */
122static struct GNUNET_DISK_PipeHandle *notification_pipe;
123
124/**
125 * Pipe read task
126 */
127static struct GNUNET_SCHEDULER_Task *pipe_read_task;
128
129struct OpenSignJob
130{
131
132 struct OpenSignJob *next;
133
134 struct OpenSignJob *prev;
135
136 struct GNUNET_IDENTITY_PrivateKey zone;
137
138 struct GNUNET_GNSRECORD_Block *block;
139
140 struct GNUNET_GNSRECORD_Block *block_priv;
141
142 struct DhtPutActivity *ma;
143
144 size_t block_size;
145
146 struct GNUNET_TIME_Absolute expire_pub;
147
148 char *label;
149
150};
151
152
153/**
154 * DLL
155 */
156static struct OpenSignJob *jobs_head;
157
158/**
159 * DLL
160 */
161static struct OpenSignJob *jobs_tail;
162
163/**
164 * DLL
165 */
166static struct OpenSignJob *results_head;
167
168/**
169 * DLL
170 */
171static struct OpenSignJob *results_tail;
172
173
174/**
100 * Handle for DHT PUT activity triggered from the namestore monitor. 175 * Handle for DHT PUT activity triggered from the namestore monitor.
101 */ 176 */
102struct DhtPutActivity 177struct DhtPutActivity
@@ -319,8 +394,13 @@ shutdown_task (void *cls)
319 struct CacheOperation *cop; 394 struct CacheOperation *cop;
320 395
321 (void) cls; 396 (void) cls;
397 in_shutdown == GNUNET_YES;
322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 398 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
323 "Shutting down!\n"); 399 "Shutting down!\n");
400 if (NULL != notification_pipe)
401 GNUNET_DISK_pipe_close (notification_pipe);
402 if (NULL != pipe_read_task)
403 GNUNET_SCHEDULER_cancel (pipe_read_task);
324 while (NULL != (cop = cop_head)) 404 while (NULL != (cop = cop_head))
325 { 405 {
326 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 406 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -332,7 +412,8 @@ shutdown_task (void *cls)
332 412
333 while (NULL != (ma = it_head)) 413 while (NULL != (ma = it_head))
334 { 414 {
335 GNUNET_DHT_put_cancel (ma->ph); 415 if (NULL != ma->ph)
416 GNUNET_DHT_put_cancel (ma->ph);
336 dht_queue_length--; 417 dht_queue_length--;
337 GNUNET_CONTAINER_DLL_remove (it_head, 418 GNUNET_CONTAINER_DLL_remove (it_head,
338 it_tail, 419 it_tail,
@@ -682,6 +763,16 @@ dht_put_continuation (void *cls)
682 GNUNET_free (ma); 763 GNUNET_free (ma);
683} 764}
684 765
766static void
767free_job (struct OpenSignJob *job)
768{
769 if (job->block != job->block_priv)
770 GNUNET_free (job->block_priv);
771 GNUNET_free (job->block);
772 if (NULL != job->label)
773 GNUNET_free (job->label);
774 GNUNET_free (job);
775}
685 776
686 777
687/** 778/**
@@ -760,35 +851,86 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key,
760 else 851 else
761 block_priv = block; 852 block_priv = block;
762 block_size = GNUNET_GNSRECORD_block_get_size (block); 853 block_size = GNUNET_GNSRECORD_block_get_size (block);
763 GNUNET_GNSRECORD_query_from_private_key (key, 854 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
764 label, 855 struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
856 job->block = GNUNET_malloc (block_size); // FIXME this does not need to be copied, can be freed by worker
857 memcpy (job->block, block, block_size);
858 job->block_size = block_size;
859 job->block_priv = block_priv;
860 job->zone = *key;
861 job->ma = ma;
862 job->label = GNUNET_strdup (label);
863 job->expire_pub = expire_pub;
864 GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
865 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
866 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867 "Storing %u record(s) for label `%s' in DHT with expiration `%s'\n",
868 rd_public_count,
869 label,
870 GNUNET_STRINGS_absolute_time_to_string (expire));
871 num_public_records++;
872}
873
874static void
875notification_pipe_cb (void *cls);
876
877static void
878initiate_put_from_pipe_trigger (void *cls)
879{
880 struct GNUNET_HashCode query;
881 struct OpenSignJob *job;
882
883 pipe_read_task = NULL;
884 GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
885 job = results_head;
886 if (NULL == job)
887 {
888 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
889 const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
890 notification_pipe,
891 GNUNET_DISK_PIPE_END_READ);
892 pipe_read_task =
893 GNUNET_SCHEDULER_add_read_file (
894 GNUNET_TIME_UNIT_FOREVER_REL,
895 np_fh,
896 notification_pipe_cb,
897 NULL);
898 return;
899 }
900 GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job);
901 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
902 GNUNET_GNSRECORD_query_from_private_key (&job->zone,
903 job->label,
765 &query); 904 &query);
766 GNUNET_STATISTICS_update (statistics, 905 GNUNET_STATISTICS_update (statistics,
767 "DHT put operations initiated", 906 "DHT put operations initiated",
768 1, 907 1,
769 GNUNET_NO); 908 GNUNET_NO);
770 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 909 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771 "Storing %u record(s) for label `%s' in DHT with expiration `%s' under key %s\n", 910 "Storing record(s) for label `%s' in DHT under key %s\n",
772 rd_public_count, 911 job->label,
773 label,
774 GNUNET_STRINGS_absolute_time_to_string (expire),
775 GNUNET_h2s (&query)); 912 GNUNET_h2s (&query));
776 num_public_records++; 913 job->ma->ph = GNUNET_DHT_put (dht_handle,
777 ret = GNUNET_DHT_put (dht_handle, 914 &query,
778 &query, 915 DHT_GNS_REPLICATION_LEVEL,
779 DHT_GNS_REPLICATION_LEVEL, 916 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
780 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 917 GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
781 GNUNET_BLOCK_TYPE_GNS_NAMERECORD, 918 job->block_size,
782 block_size, 919 job->block,
783 block, 920 job->expire_pub,
784 expire_pub, 921 &dht_put_continuation,
785 &dht_put_continuation, 922 job->ma);
786 ma); 923 if (NULL == job->ma->ph)
787 refresh_block (block_priv); 924 {
788 if (block != block_priv) 925 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
789 GNUNET_free (block_priv); 926 "Could not perform DHT PUT, is the DHT running?\n");
790 GNUNET_free (block); 927 GNUNET_free (job->ma);
791 return ret; 928 free_job (job);
929 return;
930 }
931 refresh_block (job->block_priv);
932 free_job (job);
933 return;
792} 934}
793 935
794 936
@@ -907,45 +1049,29 @@ put_gns_record (void *cls,
907 /* We got a set of records to publish */ 1049 /* We got a set of records to publish */
908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1050 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909 "Starting DHT PUT\n"); 1051 "Starting DHT PUT\n");
910
911 ma = GNUNET_new (struct DhtPutActivity);
912 ma->start_date = GNUNET_TIME_absolute_get ();
913 ma->ph = perform_dht_put (key,
914 label,
915 rd,
916 rd_count,
917 expire,
918 ma);
919 put_cnt++; 1052 put_cnt++;
920 if (0 == put_cnt % DELTA_INTERVAL) 1053 if (0 == put_cnt % DELTA_INTERVAL)
921 update_velocity (DELTA_INTERVAL); 1054 update_velocity (DELTA_INTERVAL);
922 check_zone_namestore_next (); 1055 check_zone_namestore_next ();
923 if (NULL == ma->ph) 1056 if (dht_queue_length >= DHT_QUEUE_LIMIT)
924 { 1057 {
925 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1058 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
926 "Could not perform DHT PUT, is the DHT running?\n"); 1059 "DHT PUT queue length exceeded (%u), aborting PUT\n",
927 GNUNET_free (ma); 1060 DHT_QUEUE_LIMIT);
928 return; 1061 return;
929 } 1062 }
1063
1064 ma = GNUNET_new (struct DhtPutActivity);
1065 perform_dht_put (key,
1066 label,
1067 rd,
1068 rd_count,
1069 expire,
1070 ma);
930 dht_queue_length++; 1071 dht_queue_length++;
931 GNUNET_CONTAINER_DLL_insert_tail (it_head, 1072 GNUNET_CONTAINER_DLL_insert_tail (it_head,
932 it_tail, 1073 it_tail,
933 ma); 1074 ma);
934 if (dht_queue_length > DHT_QUEUE_LIMIT)
935 {
936 ma = it_head;
937 GNUNET_CONTAINER_DLL_remove (it_head,
938 it_tail,
939 ma);
940 GNUNET_DHT_put_cancel (ma->ph);
941 dht_queue_length--;
942 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
943 "DHT PUT unconfirmed after %s, aborting PUT\n",
944 GNUNET_STRINGS_relative_time_to_string (
945 GNUNET_TIME_absolute_get_duration (ma->start_date),
946 GNUNET_YES));
947 GNUNET_free (ma);
948 }
949} 1075}
950 1076
951/** 1077/**
@@ -1075,9 +1201,18 @@ perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
1075 else 1201 else
1076 block_priv = block; 1202 block_priv = block;
1077 block_size = GNUNET_GNSRECORD_block_get_size (block); 1203 block_size = GNUNET_GNSRECORD_block_get_size (block);
1204 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
1205 struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
1206 job->block = GNUNET_malloc (block_size); // FIXME this does not need to be copied, can be freed by worker
1207 memcpy (job->block, block, block_size);
1208 job->zone = *key;
1209 job->label = GNUNET_strdup (label);
1210 GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
1211 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
1078 GNUNET_GNSRECORD_query_from_private_key (key, 1212 GNUNET_GNSRECORD_query_from_private_key (key,
1079 label, 1213 label,
1080 &query); 1214 &query);
1215 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
1081 GNUNET_STATISTICS_update (statistics, 1216 GNUNET_STATISTICS_update (statistics,
1082 "DHT put operations initiated", 1217 "DHT put operations initiated",
1083 1, 1218 1,
@@ -1196,6 +1331,48 @@ handle_monitor_error (void *cls)
1196 GNUNET_NO); 1331 GNUNET_NO);
1197} 1332}
1198 1333
1334static void*
1335sign_worker (void *)
1336{
1337 struct OpenSignJob *job;
1338 const struct GNUNET_DISK_FileHandle *fh;
1339
1340 fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE);
1341 while (GNUNET_YES != in_shutdown)
1342 {
1343 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
1344 if (NULL != jobs_head)
1345 {
1346 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1347 "Taking on Job for %s\n", jobs_head->label);
1348 job = jobs_head;
1349 GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job);
1350 }
1351 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
1352 if (NULL != job)
1353 {
1354 GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
1355 GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job);
1356 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
1357 job = NULL;
1358 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1359 "Done, notifying main thread throug pipe!\n");
1360 GNUNET_DISK_file_write (fh, "!", 1);
1361 }
1362 else {
1363 sleep (1);
1364 }
1365 }
1366 return NULL;
1367}
1368
1369static void
1370notification_pipe_cb (void *cls)
1371{
1372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373 "Received wake up notification through pipe, checking results\n");
1374 GNUNET_SCHEDULER_add_now (&initiate_put_from_pipe_trigger, NULL);
1375}
1199 1376
1200/** 1377/**
1201 * Perform zonemaster duties: watch namestore, publish records. 1378 * Perform zonemaster duties: watch namestore, publish records.
@@ -1305,6 +1482,40 @@ run (void *cls,
1305 1482
1306 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1483 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1307 NULL); 1484 NULL);
1485
1486 notification_pipe = GNUNET_DISK_pipe (GNUNET_DISK_PF_NONE);
1487 const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
1488 notification_pipe,
1489 GNUNET_DISK_PIPE_END_READ);
1490 pipe_read_task = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
1491 np_fh,
1492 notification_pipe_cb, NULL);
1493
1494 long long unsigned int worker_count = 1;
1495 if (GNUNET_OK !=
1496 GNUNET_CONFIGURATION_get_value_number (c,
1497 "zonemaster",
1498 "WORKER_COUNT",
1499 &worker_count))
1500 {
1501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1502 "Number of workers not defined falling back to 1\n");
1503 }
1504 worker = GNUNET_malloc (sizeof (pthread_t) * worker_count);
1505 /** Start worker */
1506 for (int i = 0; i < worker_count; i++)
1507 {
1508 if (0 !=
1509 pthread_create (&worker[i],
1510 NULL,
1511 &sign_worker,
1512 NULL))
1513 {
1514 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
1515 "pthread_create");
1516 GNUNET_SCHEDULER_shutdown ();
1517 }
1518 }
1308} 1519}
1309 1520
1310 1521