aboutsummaryrefslogtreecommitdiff
path: root/src/zonemaster/gnunet-service-zonemaster.c
diff options
context:
space:
mode:
authorMartin Schanzenbach <schanzen@gnunet.org>2022-10-26 13:22:45 +0900
committerMartin Schanzenbach <schanzen@gnunet.org>2022-10-26 13:22:45 +0900
commit27929f3f87f6c94e74d88d3ad49602bb546a0e4c (patch)
treebcc14bd612bd2ac79469e20835156f9a6eb30953 /src/zonemaster/gnunet-service-zonemaster.c
parent85b690809e76aaddb83fd9be96a69426706ec617 (diff)
downloadgnunet-27929f3f87f6c94e74d88d3ad49602bb546a0e4c.tar.gz
gnunet-27929f3f87f6c94e74d88d3ad49602bb546a0e4c.zip
-cleanup
Diffstat (limited to 'src/zonemaster/gnunet-service-zonemaster.c')
-rw-r--r--src/zonemaster/gnunet-service-zonemaster.c281
1 files changed, 132 insertions, 149 deletions
diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c
index 863716a44..8e5d157fd 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -97,19 +97,19 @@
97static pthread_t * worker; 97static pthread_t * worker;
98 98
99/** 99/**
100 * Lock for the open jobs queue. 100 * Lock for the sign jobs queue.
101 */ 101 */
102static pthread_mutex_t jobs_lock; 102static pthread_mutex_t sign_jobs_lock;
103 103
104/** 104/**
105 * Lock for the finished results queue. 105 * Lock for the DHT put jobs queue.
106 */ 106 */
107static pthread_mutex_t results_lock; 107static pthread_mutex_t sign_results_lock;
108 108
109/** 109/**
110 * Wait condition on new jobs 110 * Wait condition on new sign jobs
111 */ 111 */
112static pthread_cond_t empty_jobs; 112static pthread_cond_t sign_jobs_cond;
113 113
114/** 114/**
115 * For threads to know we are shutting down 115 * For threads to know we are shutting down
@@ -136,77 +136,94 @@ static struct GNUNET_DISK_PipeHandle *notification_pipe;
136 */ 136 */
137static struct GNUNET_SCHEDULER_Task *pipe_read_task; 137static struct GNUNET_SCHEDULER_Task *pipe_read_task;
138 138
139struct OpenSignJob 139struct RecordPublicationJob
140{ 140{
141 141
142 struct OpenSignJob *next; 142 /**
143 * DLL
144 */
145 struct RecordPublicationJob *next;
143 146
144 struct OpenSignJob *prev; 147 /**
148 * DLL
149 */
150 struct RecordPublicationJob *prev;
145 151
152 /**
153 * The zone key to sign the block with
154 */
146 struct GNUNET_IDENTITY_PrivateKey zone; 155 struct GNUNET_IDENTITY_PrivateKey zone;
147 156
157 /**
158 * The block to sign
159 */
148 struct GNUNET_GNSRECORD_Block *block; 160 struct GNUNET_GNSRECORD_Block *block;
149 161
162 /**
163 * The private block to sign, may point to block in case
164 * the public and private blocks are the same.
165 */
150 struct GNUNET_GNSRECORD_Block *block_priv; 166 struct GNUNET_GNSRECORD_Block *block_priv;
151 167
152 struct DhtPutActivity *ma; 168 /**
153 169 * The size of the public block for the DHT put.
170 */
154 size_t block_size; 171 size_t block_size;
155 172
173 /**
174 * The expiration time of the public block for the DHT put.
175 */
156 struct GNUNET_TIME_Absolute expire_pub; 176 struct GNUNET_TIME_Absolute expire_pub;
157 177
178 /**
179 * The label of the block needed for signing
180 */
158 char *label; 181 char *label;
159 182
183 /**
184 * Handle for the DHT PUT operation.
185 */
186 struct GNUNET_DHT_PutHandle *ph;
187
188 /**
189 * When was this PUT initiated?
190 */
191 struct GNUNET_TIME_Absolute start_date;
160}; 192};
161 193
162 194
163/** 195/**
164 * DLL 196 * The DLL for workers to retrieve open jobs that require
197 * signing of blocks.
165 */ 198 */
166static struct OpenSignJob *jobs_head; 199static struct RecordPublicationJob *sign_jobs_head;
167 200
168/** 201/**
169 * DLL 202 * See above
170 */ 203 */
171static struct OpenSignJob *jobs_tail; 204static struct RecordPublicationJob *sign_jobs_tail;
172 205
173/** 206/**
174 * DLL 207 * The DLL for workers to place jobs that are signed.
175 */ 208 */
176static struct OpenSignJob *results_head; 209static struct RecordPublicationJob *sign_results_head;
177 210
178/** 211/**
179 * DLL 212 * See above
180 */ 213 */
181static struct OpenSignJob *results_tail; 214static struct RecordPublicationJob *sign_results_tail;
182 215
183 216
184/** 217/**
185 * Handle for DHT PUT activity triggered from the namestore monitor. 218 * The DLL for jobs currently in the process of being dispatched into the DHT.
186 */ 219 */
187struct DhtPutActivity 220static struct RecordPublicationJob *dht_jobs_head;
188{
189 /**
190 * Kept in a DLL.
191 */
192 struct DhtPutActivity *next;
193 221
194 /** 222/**
195 * Kept in a DLL. 223 * See above
196 */ 224 */
197 struct DhtPutActivity *prev; 225static struct RecordPublicationJob *dht_jobs_tail;
198
199 /**
200 * Handle for the DHT PUT operation.
201 */
202 struct GNUNET_DHT_PutHandle *ph;
203
204 /**
205 * When was this PUT initiated?
206 */
207 struct GNUNET_TIME_Absolute start_date;
208 226
209};
210 227
211/** 228/**
212 * Pending operation on the namecache. 229 * Pending operation on the namecache.
@@ -268,16 +285,6 @@ static int disable_namecache;
268static struct GNUNET_NAMESTORE_ZoneIterator *namestore_iter; 285static struct GNUNET_NAMESTORE_ZoneIterator *namestore_iter;
269 286
270/** 287/**
271 * Head of iteration put activities; kept in a DLL.
272 */
273static struct DhtPutActivity *it_head;
274
275/**
276 * Tail of iteration put activities; kept in a DLL.
277 */
278static struct DhtPutActivity *it_tail;
279
280/**
281 * Number of entries in the job queue #jobs_head. 288 * Number of entries in the job queue #jobs_head.
282 */ 289 */
283static unsigned int job_queue_length; 290static unsigned int job_queue_length;
@@ -378,7 +385,7 @@ static struct CacheOperation *cop_tail;
378 385
379 386
380static void 387static void
381free_job (struct OpenSignJob *job) 388free_job (struct RecordPublicationJob *job)
382{ 389{
383 if (job->block != job->block_priv) 390 if (job->block != job->block_priv)
384 GNUNET_free (job->block_priv); 391 GNUNET_free (job->block_priv);
@@ -397,9 +404,8 @@ free_job (struct OpenSignJob *job)
397static void 404static void
398shutdown_task (void *cls) 405shutdown_task (void *cls)
399{ 406{
400 struct DhtPutActivity *ma;
401 struct CacheOperation *cop; 407 struct CacheOperation *cop;
402 struct OpenSignJob *job; 408 struct RecordPublicationJob *job;
403 409
404 (void) cls; 410 (void) cls;
405 in_shutdown = GNUNET_YES; 411 in_shutdown = GNUNET_YES;
@@ -417,36 +423,35 @@ shutdown_task (void *cls)
417 GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop); 423 GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop);
418 GNUNET_free (cop); 424 GNUNET_free (cop);
419 } 425 }
420 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); 426 GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
421 while (NULL != (job = jobs_head)) 427 while (NULL != (job = sign_jobs_head))
422 { 428 {
423 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 429 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
424 "Removing incomplete jobs\n"); 430 "Removing incomplete jobs\n");
425 GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); 431 GNUNET_CONTAINER_DLL_remove (sign_jobs_head, sign_jobs_tail, job);
426 job_queue_length--; 432 job_queue_length--;
427 free_job (job); 433 free_job (job);
428 } 434 }
429 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); 435 GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
430 GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); 436 GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
431 while (NULL != (job = results_head)) 437 while (NULL != (job = sign_results_head))
432 { 438 {
433 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 439 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
434 "Removing incomplete jobs\n"); 440 "Removing incomplete jobs\n");
435 GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); 441 GNUNET_CONTAINER_DLL_remove (sign_results_head, sign_results_tail, job);
436 free_job (job); 442 free_job (job);
437 } 443 }
438 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); 444 GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
439 445 while (NULL != (job = dht_jobs_head))
440 while (NULL != (ma = it_head))
441 { 446 {
442 if (NULL != ma->ph) 447 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
443 GNUNET_DHT_put_cancel (ma->ph); 448 "Removing incomplete jobs\n");
444 GNUNET_CONTAINER_DLL_remove (it_head, 449 GNUNET_CONTAINER_DLL_remove (dht_jobs_head, dht_jobs_tail, job);
445 it_tail, 450 if (NULL != job->ph)
446 ma); 451 GNUNET_DHT_put_cancel (job->ph);
447 GNUNET_free (ma); 452 free_job (job);
448 } 453 }
449 if (NULL != statistics) 454if (NULL != statistics)
450 { 455 {
451 GNUNET_STATISTICS_destroy (statistics, 456 GNUNET_STATISTICS_destroy (statistics,
452 GNUNET_NO); 457 GNUNET_NO);
@@ -763,12 +768,11 @@ check_zone_namestore_next ()
763/** 768/**
764 * Continuation called from DHT once the PUT operation is done. 769 * Continuation called from DHT once the PUT operation is done.
765 * 770 *
766 * @param cls a `struct DhtPutActivity`
767 */ 771 */
768static void 772static void
769dht_put_continuation (void *cls) 773dht_put_continuation (void *cls)
770{ 774{
771 struct DhtPutActivity *ma = cls; 775 struct RecordPublicationJob *job = cls;
772 776
773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774 "PUT complete\n"); 778 "PUT complete\n");
@@ -788,10 +792,10 @@ dht_put_continuation (void *cls)
788 } 792 }
789 } 793 }
790 job_queue_length--; 794 job_queue_length--;
791 GNUNET_CONTAINER_DLL_remove (it_head, 795 GNUNET_CONTAINER_DLL_remove (dht_jobs_head,
792 it_tail, 796 dht_jobs_tail,
793 ma); 797 job);
794 GNUNET_free (ma); 798 free_job (job);
795} 799}
796 800
797 801
@@ -810,8 +814,7 @@ dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key,
810 const char *label, 814 const char *label,
811 const struct GNUNET_GNSRECORD_Data *rd, 815 const struct GNUNET_GNSRECORD_Data *rd,
812 unsigned int rd_count, 816 unsigned int rd_count,
813 const struct GNUNET_TIME_Absolute expire, 817 const struct GNUNET_TIME_Absolute expire)
814 struct DhtPutActivity *ma)
815{ 818{
816 struct GNUNET_GNSRECORD_Data rd_public[rd_count]; 819 struct GNUNET_GNSRECORD_Data rd_public[rd_count];
817 struct GNUNET_GNSRECORD_Block *block; 820 struct GNUNET_GNSRECORD_Block *block;
@@ -858,18 +861,17 @@ dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key,
858 else 861 else
859 block_priv = block; 862 block_priv = block;
860 block_size = GNUNET_GNSRECORD_block_get_size (block); 863 block_size = GNUNET_GNSRECORD_block_get_size (block);
861 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); 864 GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
862 struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); 865 struct RecordPublicationJob *job = GNUNET_new (struct RecordPublicationJob);
863 job->block = block; 866 job->block = block;
864 job->block_size = block_size; 867 job->block_size = block_size;
865 job->block_priv = block_priv; 868 job->block_priv = block_priv;
866 job->zone = *key; 869 job->zone = *key;
867 job->ma = ma;
868 job->label = GNUNET_strdup (label); 870 job->label = GNUNET_strdup (label);
869 job->expire_pub = expire_pub; 871 job->expire_pub = expire_pub;
870 GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); 872 GNUNET_CONTAINER_DLL_insert (sign_jobs_head, sign_jobs_tail, job);
871 GNUNET_assert (0 == pthread_cond_signal (&empty_jobs)); 873 GNUNET_assert (0 == pthread_cond_signal (&sign_jobs_cond));
872 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); 874 GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
873 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 875 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874 "Creating job with %u record(s) for label `%s', expiration `%s'\n", 876 "Creating job with %u record(s) for label `%s', expiration `%s'\n",
875 rd_public_count, 877 rd_public_count,
@@ -887,7 +889,7 @@ static void
887initiate_put_from_pipe_trigger (void *cls) 889initiate_put_from_pipe_trigger (void *cls)
888{ 890{
889 struct GNUNET_HashCode query; 891 struct GNUNET_HashCode query;
890 struct OpenSignJob *job; 892 struct RecordPublicationJob *job;
891 const struct GNUNET_DISK_FileHandle *np_fh; 893 const struct GNUNET_DISK_FileHandle *np_fh;
892 char buf[100]; 894 char buf[100];
893 ssize_t nf_count; 895 ssize_t nf_count;
@@ -907,20 +909,37 @@ initiate_put_from_pipe_trigger (void *cls)
907 (long long) nf_count); 909 (long long) nf_count);
908 while (true) 910 while (true)
909 { 911 {
910 GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); 912 GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
911 if (NULL == results_head) 913 if (NULL == sign_results_head)
912 { 914 {
913 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 915 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914 "No more results. Back to sleep.\n"); 916 "No more results. Back to sleep.\n");
915 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); 917 GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
916 return; 918 return;
917 } 919 }
918 job = results_head; 920 job = sign_results_head;
919 GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job); 921 GNUNET_CONTAINER_DLL_remove (sign_results_head, sign_results_tail, job);
920 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); 922 GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
921 GNUNET_GNSRECORD_query_from_private_key (&job->zone, 923 GNUNET_GNSRECORD_query_from_private_key (&job->zone,
922 job->label, 924 job->label,
923 &query); 925 &query);
926 job->ph = GNUNET_DHT_put (dht_handle,
927 &query,
928 DHT_GNS_REPLICATION_LEVEL,
929 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
930 GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
931 job->block_size,
932 job->block,
933 job->expire_pub,
934 &dht_put_continuation,
935 job);
936 if (NULL == job->ph)
937 {
938 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
939 "Could not perform DHT PUT, is the DHT running?\n");
940 free_job (job);
941 return;
942 }
924 GNUNET_STATISTICS_update (statistics, 943 GNUNET_STATISTICS_update (statistics,
925 "DHT put operations initiated", 944 "DHT put operations initiated",
926 1, 945 1,
@@ -929,26 +948,8 @@ initiate_put_from_pipe_trigger (void *cls)
929 "Storing record(s) for label `%s' in DHT under key %s\n", 948 "Storing record(s) for label `%s' in DHT under key %s\n",
930 job->label, 949 job->label,
931 GNUNET_h2s (&query)); 950 GNUNET_h2s (&query));
932 job->ma->ph = GNUNET_DHT_put (dht_handle,
933 &query,
934 DHT_GNS_REPLICATION_LEVEL,
935 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
936 GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
937 job->block_size,
938 job->block,
939 job->expire_pub,
940 &dht_put_continuation,
941 job->ma);
942 if (NULL == job->ma->ph)
943 {
944 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
945 "Could not perform DHT PUT, is the DHT running?\n");
946 GNUNET_free (job->ma);
947 free_job (job);
948 return;
949 }
950 refresh_block (job->block_priv); 951 refresh_block (job->block_priv);
951 free_job (job); 952 GNUNET_CONTAINER_DLL_insert (dht_jobs_head, dht_jobs_tail, job);
952 } 953 }
953} 954}
954 955
@@ -1042,8 +1043,6 @@ handle_record (void *cls,
1042 const struct GNUNET_GNSRECORD_Data *rd, 1043 const struct GNUNET_GNSRECORD_Data *rd,
1043 struct GNUNET_TIME_Absolute expire) 1044 struct GNUNET_TIME_Absolute expire)
1044{ 1045{
1045 struct DhtPutActivity *ma;
1046
1047 (void) cls; 1046 (void) cls;
1048 ns_iteration_left--; 1047 ns_iteration_left--;
1049 if (0 == rd_count) 1048 if (0 == rd_count)
@@ -1072,16 +1071,11 @@ handle_record (void *cls,
1072 put_cnt++; 1071 put_cnt++;
1073 if (0 == put_cnt % DELTA_INTERVAL) 1072 if (0 == put_cnt % DELTA_INTERVAL)
1074 update_velocity (DELTA_INTERVAL); 1073 update_velocity (DELTA_INTERVAL);
1075 ma = GNUNET_new (struct DhtPutActivity);
1076 dispatch_job (key, 1074 dispatch_job (key,
1077 label, 1075 label,
1078 rd, 1076 rd,
1079 rd_count, 1077 rd_count,
1080 expire, 1078 expire);
1081 ma);
1082 GNUNET_CONTAINER_DLL_insert_tail (it_head,
1083 it_tail,
1084 ma);
1085 job_queue_length++; 1079 job_queue_length++;
1086 if (job_queue_length >= JOB_QUEUE_LIMIT) 1080 if (job_queue_length >= JOB_QUEUE_LIMIT)
1087 { 1081 {
@@ -1136,16 +1130,13 @@ publish_zone_dht_start (void *cls)
1136 * @param label label to store under 1130 * @param label label to store under
1137 * @param rd_public public record data 1131 * @param rd_public public record data
1138 * @param rd_public_count number of records in @a rd_public 1132 * @param rd_public_count number of records in @a rd_public
1139 * @param ma handle for the PUT operation
1140 * @return DHT PUT handle, NULL on error
1141 */ 1133 */
1142static void 1134static void
1143dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key, 1135dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
1144 const char *label, 1136 const char *label,
1145 const struct GNUNET_GNSRECORD_Data *rd, 1137 const struct GNUNET_GNSRECORD_Data *rd,
1146 unsigned int rd_count, 1138 unsigned int rd_count,
1147 struct GNUNET_TIME_Absolute expire, 1139 struct GNUNET_TIME_Absolute expire)
1148 struct DhtPutActivity *ma)
1149{ 1140{
1150 struct GNUNET_GNSRECORD_Data rd_public[rd_count]; 1141 struct GNUNET_GNSRECORD_Data rd_public[rd_count];
1151 struct GNUNET_GNSRECORD_Block *block; 1142 struct GNUNET_GNSRECORD_Block *block;
@@ -1192,18 +1183,17 @@ dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
1192 else 1183 else
1193 block_priv = block; 1184 block_priv = block;
1194 block_size = GNUNET_GNSRECORD_block_get_size (block); 1185 block_size = GNUNET_GNSRECORD_block_get_size (block);
1195 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); 1186 GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
1196 struct OpenSignJob *job = GNUNET_new (struct OpenSignJob); 1187 struct RecordPublicationJob *job = GNUNET_new (struct RecordPublicationJob);
1197 job->block = block; 1188 job->block = block;
1198 job->block_size = block_size; 1189 job->block_size = block_size;
1199 job->block_priv = block_priv; 1190 job->block_priv = block_priv;
1200 job->zone = *key; 1191 job->zone = *key;
1201 job->ma = ma;
1202 job->label = GNUNET_strdup (label); 1192 job->label = GNUNET_strdup (label);
1203 job->expire_pub = expire_pub; 1193 job->expire_pub = expire_pub;
1204 GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job); 1194 GNUNET_CONTAINER_DLL_insert (sign_jobs_head, sign_jobs_tail, job);
1205 GNUNET_assert (0 == pthread_cond_signal (&empty_jobs)); 1195 GNUNET_assert (0 == pthread_cond_signal (&sign_jobs_cond));
1206 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); 1196 GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
1207} 1197}
1208 1198
1209 1199
@@ -1226,8 +1216,6 @@ handle_monitor_event (void *cls,
1226 const struct GNUNET_GNSRECORD_Data *rd, 1216 const struct GNUNET_GNSRECORD_Data *rd,
1227 struct GNUNET_TIME_Absolute expire) 1217 struct GNUNET_TIME_Absolute expire)
1228{ 1218{
1229 struct DhtPutActivity *ma;
1230
1231 (void) cls; 1219 (void) cls;
1232 GNUNET_STATISTICS_update (statistics, 1220 GNUNET_STATISTICS_update (statistics,
1233 "Namestore monitor events received", 1221 "Namestore monitor events received",
@@ -1243,16 +1231,11 @@ handle_monitor_event (void *cls,
1243 1); 1231 1);
1244 return; /* nothing to do */ 1232 return; /* nothing to do */
1245 } 1233 }
1246 ma = GNUNET_new (struct DhtPutActivity);
1247 dispatch_job_monitor (zone, 1234 dispatch_job_monitor (zone,
1248 label, 1235 label,
1249 rd, 1236 rd,
1250 rd_count, 1237 rd_count,
1251 expire, 1238 expire);
1252 ma);
1253 GNUNET_CONTAINER_DLL_insert_tail (it_head,
1254 it_tail,
1255 ma);
1256 job_queue_length++; 1239 job_queue_length++;
1257 if (job_queue_length >= JOB_QUEUE_LIMIT) 1240 if (job_queue_length >= JOB_QUEUE_LIMIT)
1258 { 1241 {
@@ -1287,31 +1270,31 @@ handle_monitor_error (void *cls)
1287static void* 1270static void*
1288sign_worker (void *cls) 1271sign_worker (void *cls)
1289{ 1272{
1290 struct OpenSignJob *job; 1273 struct RecordPublicationJob *job;
1291 const struct GNUNET_DISK_FileHandle *fh; 1274 const struct GNUNET_DISK_FileHandle *fh;
1292 1275
1293 fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE); 1276 fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE);
1294 while (GNUNET_YES != in_shutdown) 1277 while (GNUNET_YES != in_shutdown)
1295 { 1278 {
1296 GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock)); 1279 GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
1297 while (NULL == jobs_head) 1280 while (NULL == sign_jobs_head)
1298 GNUNET_assert (0 == pthread_cond_wait (&empty_jobs, &jobs_lock)); 1281 GNUNET_assert (0 == pthread_cond_wait (&sign_jobs_cond, &sign_jobs_lock));
1299 if (GNUNET_YES == in_shutdown) 1282 if (GNUNET_YES == in_shutdown)
1300 { 1283 {
1301 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); 1284 GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
1302 return NULL; 1285 return NULL;
1303 } 1286 }
1304 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1305 "Taking on Job for %s\n", jobs_head->label); 1288 "Taking on Job for %s\n", sign_jobs_head->label);
1306 job = jobs_head; 1289 job = sign_jobs_head;
1307 GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job); 1290 GNUNET_CONTAINER_DLL_remove (sign_jobs_head, sign_jobs_tail, job);
1308 GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock)); 1291 GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
1309 GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block); 1292 GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block);
1310 if (job->block != job->block_priv) 1293 if (job->block != job->block_priv)
1311 GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv); 1294 GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv);
1312 GNUNET_assert (0 == pthread_mutex_lock (&results_lock)); 1295 GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
1313 GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job); 1296 GNUNET_CONTAINER_DLL_insert (sign_results_head, sign_results_tail, job);
1314 GNUNET_assert (0 == pthread_mutex_unlock (&results_lock)); 1297 GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
1315 job = NULL; 1298 job = NULL;
1316 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1317 "Done, notifying main thread through pipe!\n"); 1300 "Done, notifying main thread through pipe!\n");