diff options
author | Martin Schanzenbach <schanzen@gnunet.org> | 2022-10-26 13:22:45 +0900 |
---|---|---|
committer | Martin Schanzenbach <schanzen@gnunet.org> | 2022-10-26 13:22:45 +0900 |
commit | 27929f3f87f6c94e74d88d3ad49602bb546a0e4c (patch) | |
tree | bcc14bd612bd2ac79469e20835156f9a6eb30953 /src/zonemaster/gnunet-service-zonemaster.c | |
parent | 85b690809e76aaddb83fd9be96a69426706ec617 (diff) | |
download | gnunet-27929f3f87f6c94e74d88d3ad49602bb546a0e4c.tar.gz gnunet-27929f3f87f6c94e74d88d3ad49602bb546a0e4c.zip |
-cleanup
Diffstat (limited to 'src/zonemaster/gnunet-service-zonemaster.c')
-rw-r--r-- | src/zonemaster/gnunet-service-zonemaster.c | 281 |
1 files changed, 132 insertions, 149 deletions
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c index 863716a44..8e5d157fd 100644 --- a/src/zonemaster/gnunet-service-zonemaster.c +++ b/src/zonemaster/gnunet-service-zonemaster.c | |||
@@ -97,19 +97,19 @@ | |||
97 | static pthread_t * worker; | 97 | static pthread_t * worker; |
98 | 98 | ||
99 | /** | 99 | /** |
100 | * Lock for the open jobs queue. | 100 | * Lock for the sign jobs queue. |
101 | */ | 101 | */ |
102 | static pthread_mutex_t jobs_lock; | 102 | static pthread_mutex_t sign_jobs_lock; |
103 | 103 | ||
104 | /** | 104 | /** |
105 | * Lock for the finished results queue. | 105 | * Lock for the DHT put jobs queue. |
106 | */ | 106 | */ |
107 | static pthread_mutex_t results_lock; | 107 | static pthread_mutex_t sign_results_lock; |
108 | 108 | ||
109 | /** | 109 | /** |
110 | * Wait condition on new jobs | 110 | * Wait condition on new sign jobs |
111 | */ | 111 | */ |
112 | static pthread_cond_t empty_jobs; | 112 | static pthread_cond_t sign_jobs_cond; |
113 | 113 | ||
114 | /** | 114 | /** |
115 | * For threads to know we are shutting down | 115 | * For threads to know we are shutting down |
@@ -136,77 +136,94 @@ static struct GNUNET_DISK_PipeHandle *notification_pipe; | |||
136 | */ | 136 | */ |
137 | static struct GNUNET_SCHEDULER_Task *pipe_read_task; | 137 | static struct GNUNET_SCHEDULER_Task *pipe_read_task; |
138 | 138 | ||
139 | struct OpenSignJob | 139 | struct RecordPublicationJob |
140 | { | 140 | { |
141 | 141 | ||
142 | struct OpenSignJob *next; | 142 | /** |
143 | * DLL | ||
144 | */ | ||
145 | struct RecordPublicationJob *next; | ||
143 | 146 | ||
144 | struct OpenSignJob *prev; | 147 | /** |
148 | * DLL | ||
149 | */ | ||
150 | struct RecordPublicationJob *prev; | ||
145 | 151 | ||
152 | /** | ||
153 | * The zone key to sign the block with | ||
154 | */ | ||
146 | struct GNUNET_IDENTITY_PrivateKey zone; | 155 | struct GNUNET_IDENTITY_PrivateKey zone; |
147 | 156 | ||
157 | /** | ||
158 | * The block to sign | ||
159 | */ | ||
148 | struct GNUNET_GNSRECORD_Block *block; | 160 | struct GNUNET_GNSRECORD_Block *block; |
149 | 161 | ||
162 | /** | ||
163 | * The private block to sign, may point to block in case | ||
164 | * the public and private blocks are the same. | ||
165 | */ | ||
150 | struct GNUNET_GNSRECORD_Block *block_priv; | 166 | struct GNUNET_GNSRECORD_Block *block_priv; |
151 | 167 | ||
152 | struct DhtPutActivity *ma; | 168 | /** |
153 | 169 | * The size of the public block for the DHT put. | |
170 | */ | ||
154 | size_t block_size; | 171 | size_t block_size; |
155 | 172 | ||
173 | /** | ||
174 | * The expiration time of the public block for the DHT put. | ||
175 | */ | ||
156 | struct GNUNET_TIME_Absolute expire_pub; | 176 | struct GNUNET_TIME_Absolute expire_pub; |
157 | 177 | ||
178 | /** | ||
179 | * The label of the block needed for signing | ||
180 | */ | ||
158 | char *label; | 181 | char *label; |
159 | 182 | ||
183 | /** | ||
184 | * Handle for the DHT PUT operation. | ||
185 | */ | ||
186 | struct GNUNET_DHT_PutHandle *ph; | ||
187 | |||
188 | /** | ||
189 | * When was this PUT initiated? | ||
190 | */ | ||
191 | struct GNUNET_TIME_Absolute start_date; | ||
160 | }; | 192 | }; |
161 | 193 | ||
162 | 194 | ||
163 | /** | 195 | /** |
164 | * DLL | 196 | * The DLL for workers to retrieve open jobs that require |
197 | * signing of blocks. | ||
165 | */ | 198 | */ |
166 | static struct OpenSignJob *jobs_head; | 199 | static struct RecordPublicationJob *sign_jobs_head; |
167 | 200 | ||
168 | /** | 201 | /** |
169 | * DLL | 202 | * See above |
170 | */ | 203 | */ |
171 | static struct OpenSignJob *jobs_tail; | 204 | static struct RecordPublicationJob *sign_jobs_tail; |
172 | 205 | ||
173 | /** | 206 | /** |
174 | * DLL | 207 | * The DLL for workers to place jobs that are signed. |
175 | */ | 208 | */ |
176 | static struct OpenSignJob *results_head; | 209 | static struct RecordPublicationJob *sign_results_head; |
177 | 210 | ||
178 | /** | 211 | /** |
179 | * DLL | 212 | * See above |
180 | */ | 213 | */ |
181 | static struct OpenSignJob *results_tail; | 214 | static struct RecordPublicationJob *sign_results_tail; |
182 | 215 | ||
183 | 216 | ||
184 | /** | 217 | /** |
185 | * Handle for DHT PUT activity triggered from the namestore monitor. | 218 | * The DLL for jobs currently in the process of being dispatched into the DHT. |
186 | */ | 219 | */ |
187 | struct DhtPutActivity | 220 | static struct RecordPublicationJob *dht_jobs_head; |
188 | { | ||
189 | /** | ||
190 | * Kept in a DLL. | ||
191 | */ | ||
192 | struct DhtPutActivity *next; | ||
193 | 221 | ||
194 | /** | 222 | /** |
195 | * Kept in a DLL. | 223 | * See above |
196 | */ | 224 | */ |
197 | struct DhtPutActivity *prev; | 225 | static struct RecordPublicationJob *dht_jobs_tail; |
198 | |||
199 | /** | ||
200 | * Handle for the DHT PUT operation. | ||
201 | */ | ||
202 | struct GNUNET_DHT_PutHandle *ph; | ||
203 | |||
204 | /** | ||
205 | * When was this PUT initiated? | ||
206 | */ | ||
207 | struct GNUNET_TIME_Absolute start_date; | ||
208 | 226 | ||
209 | }; | ||
210 | 227 | ||
211 | /** | 228 | /** |
212 | * Pending operation on the namecache. | 229 | * Pending operation on the namecache. |
@@ -268,16 +285,6 @@ static int disable_namecache; | |||
268 | static struct GNUNET_NAMESTORE_ZoneIterator *namestore_iter; | 285 | static struct GNUNET_NAMESTORE_ZoneIterator *namestore_iter; |
269 | 286 | ||
270 | /** | 287 | /** |
271 | * Head of iteration put activities; kept in a DLL. | ||
272 | */ | ||
273 | static struct DhtPutActivity *it_head; | ||
274 | |||
275 | /** | ||
276 | * Tail of iteration put activities; kept in a DLL. | ||
277 | */ | ||
278 | static struct DhtPutActivity *it_tail; | ||
279 | |||
280 | /** | ||
281 | * Number of entries in the job queue #jobs_head. | 288 | * Number of entries in the job queue #jobs_head. |
282 | */ | 289 | */ |
283 | static unsigned int job_queue_length; | 290 | static unsigned int job_queue_length; |
@@ -378,7 +385,7 @@ static struct CacheOperation *cop_tail; | |||
378 | 385 | ||
379 | 386 | ||
380 | static void | 387 | static void |
381 | free_job (struct OpenSignJob *job) | 388 | free_job (struct RecordPublicationJob *job) |
382 | { | 389 | { |
383 | if (job->block != job->block_priv) | 390 | if (job->block != job->block_priv) |
384 | GNUNET_free (job->block_priv); | 391 | GNUNET_free (job->block_priv); |
@@ -397,9 +404,8 @@ free_job (struct OpenSignJob *job) | |||
397 | static void | 404 | static void |
398 | shutdown_task (void *cls) | 405 | shutdown_task (void *cls) |
399 | { | 406 | { |
400 | struct DhtPutActivity *ma; | ||
401 | struct CacheOperation *cop; | 407 | struct CacheOperation *cop; |
402 | struct OpenSignJob *job; | 408 | struct RecordPublicationJob *job; |
403 | 409 | ||
404 | (void) cls; | 410 | (void) cls; |
405 | in_shutdown = GNUNET_YES; | 411 | in_shutdown = GNUNET_YES; |
@@ -417,36 +423,35 @@ shutdown_task (void *cls) | |||
417 | GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop); | 423 | GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop); |
418 | GNUNET_free (cop); | 424 | GNUNET_free (cop); |
419 | } | 425 | } |
420 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); | 426 | GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock)); |
421 | while (NULL != (job = jobs_head)) | 427 | while (NULL != (job = sign_jobs_head)) |
422 | { | 428 | { |
423 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 429 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
424 | "Removing incomplete jobs\n"); | 430 | "Removing incomplete jobs\n"); |
425 | GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); | 431 | GNUNET_CONTAINER_DLL_remove (sign_jobs_head, sign_jobs_tail, job); |
426 | job_queue_length--; | 432 | job_queue_length--; |
427 | free_job (job); | 433 | free_job (job); |
428 | } | 434 | } |
429 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | 435 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock)); |
430 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); | 436 | GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock)); |
431 | while (NULL != (job = results_head)) | 437 | while (NULL != (job = sign_results_head)) |
432 | { | 438 | { |
433 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 439 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
434 | "Removing incomplete jobs\n"); | 440 | "Removing incomplete jobs\n"); |
435 | GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); | 441 | GNUNET_CONTAINER_DLL_remove (sign_results_head, sign_results_tail, job); |
436 | free_job (job); | 442 | free_job (job); |
437 | } | 443 | } |
438 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | 444 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock)); |
439 | 445 | while (NULL != (job = dht_jobs_head)) | |
440 | while (NULL != (ma = it_head)) | ||
441 | { | 446 | { |
442 | if (NULL != ma->ph) | 447 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
443 | GNUNET_DHT_put_cancel (ma->ph); | 448 | "Removing incomplete jobs\n"); |
444 | GNUNET_CONTAINER_DLL_remove (it_head, | 449 | GNUNET_CONTAINER_DLL_remove (dht_jobs_head, dht_jobs_tail, job); |
445 | it_tail, | 450 | if (NULL != job->ph) |
446 | ma); | 451 | GNUNET_DHT_put_cancel (job->ph); |
447 | GNUNET_free (ma); | 452 | free_job (job); |
448 | } | 453 | } |
449 | if (NULL != statistics) | 454 | if (NULL != statistics) |
450 | { | 455 | { |
451 | GNUNET_STATISTICS_destroy (statistics, | 456 | GNUNET_STATISTICS_destroy (statistics, |
452 | GNUNET_NO); | 457 | GNUNET_NO); |
@@ -763,12 +768,11 @@ check_zone_namestore_next () | |||
763 | /** | 768 | /** |
764 | * Continuation called from DHT once the PUT operation is done. | 769 | * Continuation called from DHT once the PUT operation is done. |
765 | * | 770 | * |
766 | * @param cls a `struct DhtPutActivity` | ||
767 | */ | 771 | */ |
768 | static void | 772 | static void |
769 | dht_put_continuation (void *cls) | 773 | dht_put_continuation (void *cls) |
770 | { | 774 | { |
771 | struct DhtPutActivity *ma = cls; | 775 | struct RecordPublicationJob *job = cls; |
772 | 776 | ||
773 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 777 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
774 | "PUT complete\n"); | 778 | "PUT complete\n"); |
@@ -788,10 +792,10 @@ dht_put_continuation (void *cls) | |||
788 | } | 792 | } |
789 | } | 793 | } |
790 | job_queue_length--; | 794 | job_queue_length--; |
791 | GNUNET_CONTAINER_DLL_remove (it_head, | 795 | GNUNET_CONTAINER_DLL_remove (dht_jobs_head, |
792 | it_tail, | 796 | dht_jobs_tail, |
793 | ma); | 797 | job); |
794 | GNUNET_free (ma); | 798 | free_job (job); |
795 | } | 799 | } |
796 | 800 | ||
797 | 801 | ||
@@ -810,8 +814,7 @@ dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key, | |||
810 | const char *label, | 814 | const char *label, |
811 | const struct GNUNET_GNSRECORD_Data *rd, | 815 | const struct GNUNET_GNSRECORD_Data *rd, |
812 | unsigned int rd_count, | 816 | unsigned int rd_count, |
813 | const struct GNUNET_TIME_Absolute expire, | 817 | const struct GNUNET_TIME_Absolute expire) |
814 | struct DhtPutActivity *ma) | ||
815 | { | 818 | { |
816 | struct GNUNET_GNSRECORD_Data rd_public[rd_count]; | 819 | struct GNUNET_GNSRECORD_Data rd_public[rd_count]; |
817 | struct GNUNET_GNSRECORD_Block *block; | 820 | struct GNUNET_GNSRECORD_Block *block; |
@@ -858,18 +861,17 @@ dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key, | |||
858 | else | 861 | else |
859 | block_priv = block; | 862 | block_priv = block; |
860 | block_size = GNUNET_GNSRECORD_block_get_size (block); | 863 | block_size = GNUNET_GNSRECORD_block_get_size (block); |
861 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); | 864 | GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock)); |
862 | struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); | 865 | struct RecordPublicationJob *job = GNUNET_new (struct RecordPublicationJob); |
863 | job->block = block; | 866 | job->block = block; |
864 | job->block_size = block_size; | 867 | job->block_size = block_size; |
865 | job->block_priv = block_priv; | 868 | job->block_priv = block_priv; |
866 | job->zone = *key; | 869 | job->zone = *key; |
867 | job->ma = ma; | ||
868 | job->label = GNUNET_strdup (label); | 870 | job->label = GNUNET_strdup (label); |
869 | job->expire_pub = expire_pub; | 871 | job->expire_pub = expire_pub; |
870 | GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); | 872 | GNUNET_CONTAINER_DLL_insert (sign_jobs_head, sign_jobs_tail, job); |
871 | GNUNET_assert (0 == pthread_cond_signal (&empty_jobs)); | 873 | GNUNET_assert (0 == pthread_cond_signal (&sign_jobs_cond)); |
872 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | 874 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock)); |
873 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 875 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
874 | "Creating job with %u record(s) for label `%s', expiration `%s'\n", | 876 | "Creating job with %u record(s) for label `%s', expiration `%s'\n", |
875 | rd_public_count, | 877 | rd_public_count, |
@@ -887,7 +889,7 @@ static void | |||
887 | initiate_put_from_pipe_trigger (void *cls) | 889 | initiate_put_from_pipe_trigger (void *cls) |
888 | { | 890 | { |
889 | struct GNUNET_HashCode query; | 891 | struct GNUNET_HashCode query; |
890 | struct OpenSignJob *job; | 892 | struct RecordPublicationJob *job; |
891 | const struct GNUNET_DISK_FileHandle *np_fh; | 893 | const struct GNUNET_DISK_FileHandle *np_fh; |
892 | char buf[100]; | 894 | char buf[100]; |
893 | ssize_t nf_count; | 895 | ssize_t nf_count; |
@@ -907,20 +909,37 @@ initiate_put_from_pipe_trigger (void *cls) | |||
907 | (long long) nf_count); | 909 | (long long) nf_count); |
908 | while (true) | 910 | while (true) |
909 | { | 911 | { |
910 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); | 912 | GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock)); |
911 | if (NULL == results_head) | 913 | if (NULL == sign_results_head) |
912 | { | 914 | { |
913 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 915 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
914 | "No more results. Back to sleep.\n"); | 916 | "No more results. Back to sleep.\n"); |
915 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | 917 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock)); |
916 | return; | 918 | return; |
917 | } | 919 | } |
918 | job = results_head; | 920 | job = sign_results_head; |
919 | GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); | 921 | GNUNET_CONTAINER_DLL_remove (sign_results_head, sign_results_tail, job); |
920 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | 922 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock)); |
921 | GNUNET_GNSRECORD_query_from_private_key (&job->zone, | 923 | GNUNET_GNSRECORD_query_from_private_key (&job->zone, |
922 | job->label, | 924 | job->label, |
923 | &query); | 925 | &query); |
926 | job->ph = GNUNET_DHT_put (dht_handle, | ||
927 | &query, | ||
928 | DHT_GNS_REPLICATION_LEVEL, | ||
929 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, | ||
930 | GNUNET_BLOCK_TYPE_GNS_NAMERECORD, | ||
931 | job->block_size, | ||
932 | job->block, | ||
933 | job->expire_pub, | ||
934 | &dht_put_continuation, | ||
935 | job); | ||
936 | if (NULL == job->ph) | ||
937 | { | ||
938 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
939 | "Could not perform DHT PUT, is the DHT running?\n"); | ||
940 | free_job (job); | ||
941 | return; | ||
942 | } | ||
924 | GNUNET_STATISTICS_update (statistics, | 943 | GNUNET_STATISTICS_update (statistics, |
925 | "DHT put operations initiated", | 944 | "DHT put operations initiated", |
926 | 1, | 945 | 1, |
@@ -929,26 +948,8 @@ initiate_put_from_pipe_trigger (void *cls) | |||
929 | "Storing record(s) for label `%s' in DHT under key %s\n", | 948 | "Storing record(s) for label `%s' in DHT under key %s\n", |
930 | job->label, | 949 | job->label, |
931 | GNUNET_h2s (&query)); | 950 | GNUNET_h2s (&query)); |
932 | job->ma->ph = GNUNET_DHT_put (dht_handle, | ||
933 | &query, | ||
934 | DHT_GNS_REPLICATION_LEVEL, | ||
935 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, | ||
936 | GNUNET_BLOCK_TYPE_GNS_NAMERECORD, | ||
937 | job->block_size, | ||
938 | job->block, | ||
939 | job->expire_pub, | ||
940 | &dht_put_continuation, | ||
941 | job->ma); | ||
942 | if (NULL == job->ma->ph) | ||
943 | { | ||
944 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
945 | "Could not perform DHT PUT, is the DHT running?\n"); | ||
946 | GNUNET_free (job->ma); | ||
947 | free_job (job); | ||
948 | return; | ||
949 | } | ||
950 | refresh_block (job->block_priv); | 951 | refresh_block (job->block_priv); |
951 | free_job (job); | 952 | GNUNET_CONTAINER_DLL_insert (dht_jobs_head, dht_jobs_tail, job); |
952 | } | 953 | } |
953 | } | 954 | } |
954 | 955 | ||
@@ -1042,8 +1043,6 @@ handle_record (void *cls, | |||
1042 | const struct GNUNET_GNSRECORD_Data *rd, | 1043 | const struct GNUNET_GNSRECORD_Data *rd, |
1043 | struct GNUNET_TIME_Absolute expire) | 1044 | struct GNUNET_TIME_Absolute expire) |
1044 | { | 1045 | { |
1045 | struct DhtPutActivity *ma; | ||
1046 | |||
1047 | (void) cls; | 1046 | (void) cls; |
1048 | ns_iteration_left--; | 1047 | ns_iteration_left--; |
1049 | if (0 == rd_count) | 1048 | if (0 == rd_count) |
@@ -1072,16 +1071,11 @@ handle_record (void *cls, | |||
1072 | put_cnt++; | 1071 | put_cnt++; |
1073 | if (0 == put_cnt % DELTA_INTERVAL) | 1072 | if (0 == put_cnt % DELTA_INTERVAL) |
1074 | update_velocity (DELTA_INTERVAL); | 1073 | update_velocity (DELTA_INTERVAL); |
1075 | ma = GNUNET_new (struct DhtPutActivity); | ||
1076 | dispatch_job (key, | 1074 | dispatch_job (key, |
1077 | label, | 1075 | label, |
1078 | rd, | 1076 | rd, |
1079 | rd_count, | 1077 | rd_count, |
1080 | expire, | 1078 | expire); |
1081 | ma); | ||
1082 | GNUNET_CONTAINER_DLL_insert_tail (it_head, | ||
1083 | it_tail, | ||
1084 | ma); | ||
1085 | job_queue_length++; | 1079 | job_queue_length++; |
1086 | if (job_queue_length >= JOB_QUEUE_LIMIT) | 1080 | if (job_queue_length >= JOB_QUEUE_LIMIT) |
1087 | { | 1081 | { |
@@ -1136,16 +1130,13 @@ publish_zone_dht_start (void *cls) | |||
1136 | * @param label label to store under | 1130 | * @param label label to store under |
1137 | * @param rd_public public record data | 1131 | * @param rd_public public record data |
1138 | * @param rd_public_count number of records in @a rd_public | 1132 | * @param rd_public_count number of records in @a rd_public |
1139 | * @param ma handle for the PUT operation | ||
1140 | * @return DHT PUT handle, NULL on error | ||
1141 | */ | 1133 | */ |
1142 | static void | 1134 | static void |
1143 | dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, | 1135 | dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, |
1144 | const char *label, | 1136 | const char *label, |
1145 | const struct GNUNET_GNSRECORD_Data *rd, | 1137 | const struct GNUNET_GNSRECORD_Data *rd, |
1146 | unsigned int rd_count, | 1138 | unsigned int rd_count, |
1147 | struct GNUNET_TIME_Absolute expire, | 1139 | struct GNUNET_TIME_Absolute expire) |
1148 | struct DhtPutActivity *ma) | ||
1149 | { | 1140 | { |
1150 | struct GNUNET_GNSRECORD_Data rd_public[rd_count]; | 1141 | struct GNUNET_GNSRECORD_Data rd_public[rd_count]; |
1151 | struct GNUNET_GNSRECORD_Block *block; | 1142 | struct GNUNET_GNSRECORD_Block *block; |
@@ -1192,18 +1183,17 @@ dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, | |||
1192 | else | 1183 | else |
1193 | block_priv = block; | 1184 | block_priv = block; |
1194 | block_size = GNUNET_GNSRECORD_block_get_size (block); | 1185 | block_size = GNUNET_GNSRECORD_block_get_size (block); |
1195 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); | 1186 | GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock)); |
1196 | struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); | 1187 | struct RecordPublicationJob *job = GNUNET_new (struct RecordPublicationJob); |
1197 | job->block = block; | 1188 | job->block = block; |
1198 | job->block_size = block_size; | 1189 | job->block_size = block_size; |
1199 | job->block_priv = block_priv; | 1190 | job->block_priv = block_priv; |
1200 | job->zone = *key; | 1191 | job->zone = *key; |
1201 | job->ma = ma; | ||
1202 | job->label = GNUNET_strdup (label); | 1192 | job->label = GNUNET_strdup (label); |
1203 | job->expire_pub = expire_pub; | 1193 | job->expire_pub = expire_pub; |
1204 | GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); | 1194 | GNUNET_CONTAINER_DLL_insert (sign_jobs_head, sign_jobs_tail, job); |
1205 | GNUNET_assert (0 == pthread_cond_signal (&empty_jobs)); | 1195 | GNUNET_assert (0 == pthread_cond_signal (&sign_jobs_cond)); |
1206 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | 1196 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock)); |
1207 | } | 1197 | } |
1208 | 1198 | ||
1209 | 1199 | ||
@@ -1226,8 +1216,6 @@ handle_monitor_event (void *cls, | |||
1226 | const struct GNUNET_GNSRECORD_Data *rd, | 1216 | const struct GNUNET_GNSRECORD_Data *rd, |
1227 | struct GNUNET_TIME_Absolute expire) | 1217 | struct GNUNET_TIME_Absolute expire) |
1228 | { | 1218 | { |
1229 | struct DhtPutActivity *ma; | ||
1230 | |||
1231 | (void) cls; | 1219 | (void) cls; |
1232 | GNUNET_STATISTICS_update (statistics, | 1220 | GNUNET_STATISTICS_update (statistics, |
1233 | "Namestore monitor events received", | 1221 | "Namestore monitor events received", |
@@ -1243,16 +1231,11 @@ handle_monitor_event (void *cls, | |||
1243 | 1); | 1231 | 1); |
1244 | return; /* nothing to do */ | 1232 | return; /* nothing to do */ |
1245 | } | 1233 | } |
1246 | ma = GNUNET_new (struct DhtPutActivity); | ||
1247 | dispatch_job_monitor (zone, | 1234 | dispatch_job_monitor (zone, |
1248 | label, | 1235 | label, |
1249 | rd, | 1236 | rd, |
1250 | rd_count, | 1237 | rd_count, |
1251 | expire, | 1238 | expire); |
1252 | ma); | ||
1253 | GNUNET_CONTAINER_DLL_insert_tail (it_head, | ||
1254 | it_tail, | ||
1255 | ma); | ||
1256 | job_queue_length++; | 1239 | job_queue_length++; |
1257 | if (job_queue_length >= JOB_QUEUE_LIMIT) | 1240 | if (job_queue_length >= JOB_QUEUE_LIMIT) |
1258 | { | 1241 | { |
@@ -1287,31 +1270,31 @@ handle_monitor_error (void *cls) | |||
1287 | static void* | 1270 | static void* |
1288 | sign_worker (void *cls) | 1271 | sign_worker (void *cls) |
1289 | { | 1272 | { |
1290 | struct OpenSignJob *job; | 1273 | struct RecordPublicationJob *job; |
1291 | const struct GNUNET_DISK_FileHandle *fh; | 1274 | const struct GNUNET_DISK_FileHandle *fh; |
1292 | 1275 | ||
1293 | fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE); | 1276 | fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE); |
1294 | while (GNUNET_YES != in_shutdown) | 1277 | while (GNUNET_YES != in_shutdown) |
1295 | { | 1278 | { |
1296 | GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); | 1279 | GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock)); |
1297 | while (NULL == jobs_head) | 1280 | while (NULL == sign_jobs_head) |
1298 | GNUNET_assert (0 == pthread_cond_wait (&empty_jobs, &jobs_lock)); | 1281 | GNUNET_assert (0 == pthread_cond_wait (&sign_jobs_cond, &sign_jobs_lock)); |
1299 | if (GNUNET_YES == in_shutdown) | 1282 | if (GNUNET_YES == in_shutdown) |
1300 | { | 1283 | { |
1301 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | 1284 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock)); |
1302 | return NULL; | 1285 | return NULL; |
1303 | } | 1286 | } |
1304 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1287 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1305 | "Taking on Job for %s\n", jobs_head->label); | 1288 | "Taking on Job for %s\n", sign_jobs_head->label); |
1306 | job = jobs_head; | 1289 | job = sign_jobs_head; |
1307 | GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); | 1290 | GNUNET_CONTAINER_DLL_remove (sign_jobs_head, sign_jobs_tail, job); |
1308 | GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); | 1291 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock)); |
1309 | GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block); | 1292 | GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block); |
1310 | if (job->block != job->block_priv) | 1293 | if (job->block != job->block_priv) |
1311 | GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv); | 1294 | GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv); |
1312 | GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); | 1295 | GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock)); |
1313 | GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job); | 1296 | GNUNET_CONTAINER_DLL_insert (sign_results_head, sign_results_tail, job); |
1314 | GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); | 1297 | GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock)); |
1315 | job = NULL; | 1298 | job = NULL; |
1316 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1299 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1317 | "Done, notifying main thread through pipe!\n"); | 1300 | "Done, notifying main thread through pipe!\n"); |