diff options
author | Christian Grothoff <christian@grothoff.org> | 2012-01-29 17:38:57 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2012-01-29 17:38:57 +0000 |
commit | 4419d11120f7cdb592e612993ef1e1df2d023e8e (patch) | |
tree | 0f142b69f9f7229eaad1f93ff8a46a7315650060 /src/fs/fs_dirmetascan.c | |
parent | 415c958a01ab6dac73b0109becbe8657b525af9c (diff) | |
download | gnunet-4419d11120f7cdb592e612993ef1e1df2d023e8e.tar.gz gnunet-4419d11120f7cdb592e612993ef1e1df2d023e8e.zip |
-towards resolving thread-issue
Diffstat (limited to 'src/fs/fs_dirmetascan.c')
-rw-r--r-- | src/fs/fs_dirmetascan.c | 585 |
1 files changed, 30 insertions, 555 deletions
diff --git a/src/fs/fs_dirmetascan.c b/src/fs/fs_dirmetascan.c index 50b8273d3..e4ede571d 100644 --- a/src/fs/fs_dirmetascan.c +++ b/src/fs/fs_dirmetascan.c | |||
@@ -21,7 +21,7 @@ | |||
21 | /** | 21 | /** |
22 | * @file fs/fs_dirmetascan.c | 22 | * @file fs/fs_dirmetascan.c |
23 | * @brief code to asynchronously build a 'struct GNUNET_FS_ShareTreeItem' | 23 | * @brief code to asynchronously build a 'struct GNUNET_FS_ShareTreeItem' |
24 | * from an on-disk directory for publishing | 24 | * from an on-disk directory for publishing; use the 'gnunet-helper-fs-publish'. |
25 | * @author LRN | 25 | * @author LRN |
26 | * @author Christian Grothoff | 26 | * @author Christian Grothoff |
27 | */ | 27 | */ |
@@ -39,13 +39,9 @@ struct GNUNET_FS_DirScanner | |||
39 | { | 39 | { |
40 | 40 | ||
41 | /** | 41 | /** |
42 | * A thread object for the scanner thread. | 42 | * Helper process. |
43 | */ | 43 | */ |
44 | #if WINDOWS | 44 | struct GNUNET_HELPER_Handle *helper; |
45 | HANDLE thread; | ||
46 | #else | ||
47 | pthread_t thread; | ||
48 | #endif | ||
49 | 45 | ||
50 | /** | 46 | /** |
51 | * Expanded filename (as given by the scan initiator). | 47 | * Expanded filename (as given by the scan initiator). |
@@ -54,42 +50,9 @@ struct GNUNET_FS_DirScanner | |||
54 | char *filename_expanded; | 50 | char *filename_expanded; |
55 | 51 | ||
56 | /** | 52 | /** |
57 | * List of libextractor plugins to use for extracting. | 53 | * Second argument to helper process. |
58 | * Initialized when the scan starts, removed when it finishes. | ||
59 | */ | ||
60 | struct EXTRACTOR_PluginList *plugins; | ||
61 | |||
62 | /** | ||
63 | * A pipe transfer signals to the scanner. | ||
64 | */ | ||
65 | struct GNUNET_DISK_PipeHandle *stop_pipe; | ||
66 | |||
67 | /** | ||
68 | * A pipe end to read signals from. | ||
69 | */ | ||
70 | const struct GNUNET_DISK_FileHandle *stop_read; | ||
71 | |||
72 | /** | ||
73 | * A pipe end to read signals from. | ||
74 | */ | 54 | */ |
75 | const struct GNUNET_DISK_FileHandle *stop_write; | 55 | char *ex_arg; |
76 | |||
77 | /** | ||
78 | * The pipe that is used to read progress messages. Only closed | ||
79 | * after the scanner thread is finished. | ||
80 | */ | ||
81 | struct GNUNET_DISK_PipeHandle *progress_pipe; | ||
82 | |||
83 | /** | ||
84 | * The end of the pipe that is used to read progress messages. | ||
85 | */ | ||
86 | const struct GNUNET_DISK_FileHandle *progress_read; | ||
87 | |||
88 | /** | ||
89 | * Handle of the pipe end into which the progress messages are written | ||
90 | * The initiator MUST keep it alive until the scanner thread is finished. | ||
91 | */ | ||
92 | const struct GNUNET_DISK_FileHandle *progress_write; | ||
93 | 56 | ||
94 | /** | 57 | /** |
95 | * The function that will be called every time there's a progress | 58 | * The function that will be called every time there's a progress |
@@ -101,11 +64,6 @@ struct GNUNET_FS_DirScanner | |||
101 | * A closure for progress_callback. | 64 | * A closure for progress_callback. |
102 | */ | 65 | */ |
103 | void *progress_callback_cls; | 66 | void *progress_callback_cls; |
104 | |||
105 | /** | ||
106 | * A task for reading progress messages from the scanner. | ||
107 | */ | ||
108 | GNUNET_SCHEDULER_TaskIdentifier progress_read_task; | ||
109 | 67 | ||
110 | /** | 68 | /** |
111 | * After the scan is finished, it will contain a pointer to the | 69 | * After the scan is finished, it will contain a pointer to the |
@@ -115,13 +73,6 @@ struct GNUNET_FS_DirScanner | |||
115 | */ | 73 | */ |
116 | struct GNUNET_FS_ShareTreeItem *toplevel; | 74 | struct GNUNET_FS_ShareTreeItem *toplevel; |
117 | 75 | ||
118 | /** | ||
119 | * 1 if the scanner should stop, 0 otherwise. Set in response | ||
120 | * to communication errors or when the initiator wants the scanning | ||
121 | * process to stop. | ||
122 | */ | ||
123 | int do_stop; | ||
124 | |||
125 | }; | 76 | }; |
126 | 77 | ||
127 | 78 | ||
@@ -134,36 +85,14 @@ struct GNUNET_FS_DirScanner | |||
134 | void | 85 | void |
135 | GNUNET_FS_directory_scan_abort (struct GNUNET_FS_DirScanner *ds) | 86 | GNUNET_FS_directory_scan_abort (struct GNUNET_FS_DirScanner *ds) |
136 | { | 87 | { |
137 | static char c = 1; | 88 | /* terminate helper */ |
138 | 89 | GNUNET_HELPER_stop (ds->helper); | |
139 | /* signal shutdown to other thread */ | ||
140 | (void) GNUNET_DISK_file_write (ds->stop_write, &c, 1); | ||
141 | GNUNET_DISK_pipe_close_end (ds->stop_pipe, GNUNET_DISK_PIPE_END_WRITE); | ||
142 | |||
143 | /* stop reading from progress */ | ||
144 | if (ds->progress_read_task != GNUNET_SCHEDULER_NO_TASK) | ||
145 | { | ||
146 | GNUNET_SCHEDULER_cancel (ds->progress_read_task); | ||
147 | ds->progress_read_task = GNUNET_SCHEDULER_NO_TASK; | ||
148 | } | ||
149 | GNUNET_DISK_pipe_close_end (ds->progress_pipe, GNUNET_DISK_PIPE_END_READ); | ||
150 | |||
151 | /* wait for other thread to terminate */ | ||
152 | #if WINDOWS | ||
153 | WaitForSingleObject (ds->thread, INFINITE); | ||
154 | CloseHandle (ds->thread); | ||
155 | #else | ||
156 | pthread_join (ds->thread, NULL); | ||
157 | pthread_detach (ds->thread); | ||
158 | #endif | ||
159 | 90 | ||
160 | /* free resources */ | 91 | /* free resources */ |
161 | GNUNET_DISK_pipe_close (ds->stop_pipe); | ||
162 | GNUNET_DISK_pipe_close (ds->progress_pipe); | ||
163 | if (NULL != ds->toplevel) | 92 | if (NULL != ds->toplevel) |
164 | GNUNET_FS_share_tree_free (ds->toplevel); | 93 | GNUNET_FS_share_tree_free (ds->toplevel); |
165 | if (NULL != ds->plugins) | 94 | GNUNET_free (ds->ex_arg); |
166 | EXTRACTOR_plugin_remove_all (ds->plugins); | 95 | GNUNET_free (ds->filename_expanded); |
167 | GNUNET_free (ds); | 96 | GNUNET_free (ds); |
168 | } | 97 | } |
169 | 98 | ||
@@ -182,7 +111,7 @@ GNUNET_FS_directory_scan_get_result (struct GNUNET_FS_DirScanner *ds) | |||
182 | struct GNUNET_FS_ShareTreeItem *result; | 111 | struct GNUNET_FS_ShareTreeItem *result; |
183 | 112 | ||
184 | /* check that we're actually done */ | 113 | /* check that we're actually done */ |
185 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == ds->progress_read_task); | 114 | GNUNET_assert (NULL == ds->helper); |
186 | /* preserve result */ | 115 | /* preserve result */ |
187 | result = ds->toplevel; | 116 | result = ds->toplevel; |
188 | ds->toplevel = NULL; | 117 | ds->toplevel = NULL; |
@@ -192,442 +121,27 @@ GNUNET_FS_directory_scan_get_result (struct GNUNET_FS_DirScanner *ds) | |||
192 | 121 | ||
193 | 122 | ||
194 | /** | 123 | /** |
195 | * Write 'size' bytes from 'buf' into 'out'. | ||
196 | * | ||
197 | * @param in pipe to write to | ||
198 | * @param buf buffer with data to write | ||
199 | * @param size number of bytes to write | ||
200 | * @return GNUNET_OK on success, GNUNET_SYSERR on error | ||
201 | */ | ||
202 | static int | ||
203 | write_all (const struct GNUNET_DISK_FileHandle *out, | ||
204 | const void *buf, | ||
205 | size_t size) | ||
206 | { | ||
207 | const char *cbuf = buf; | ||
208 | size_t total; | ||
209 | ssize_t wr; | ||
210 | |||
211 | total = 0; | ||
212 | do | ||
213 | { | ||
214 | wr = GNUNET_DISK_file_write (out, | ||
215 | &cbuf[total], | ||
216 | size - total); | ||
217 | if (wr > 0) | ||
218 | total += wr; | ||
219 | } while ( (wr > 0) && (total < size) ); | ||
220 | if (wr <= 0) | ||
221 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
222 | "Failed to write to inter thread communication pipe: %s\n", | ||
223 | strerror (errno)); | ||
224 | return (total == size) ? GNUNET_OK : GNUNET_SYSERR; | ||
225 | } | ||
226 | |||
227 | |||
228 | /** | ||
229 | * Write progress message. | ||
230 | * | ||
231 | * @param ds | ||
232 | * @param filename name of the file to transmit, never NULL | ||
233 | * @param is_directory GNUNET_YES for directory, GNUNET_NO for file, GNUNET_SYSERR for neither | ||
234 | * @param reason reason for the progress call | ||
235 | * @return GNUNET_SYSERR to stop scanning (the pipe was broken somehow) | ||
236 | */ | ||
237 | static int | ||
238 | write_progress (struct GNUNET_FS_DirScanner *ds, | ||
239 | const char *filename, | ||
240 | int is_directory, | ||
241 | enum GNUNET_FS_DirScannerProgressUpdateReason reason) | ||
242 | { | ||
243 | size_t slen; | ||
244 | |||
245 | slen = strlen (filename) + 1; | ||
246 | if ( (GNUNET_OK != | ||
247 | write_all (ds->progress_write, | ||
248 | &reason, | ||
249 | sizeof (reason))) || | ||
250 | (GNUNET_OK != | ||
251 | write_all (ds->progress_write, | ||
252 | &slen, | ||
253 | sizeof (slen))) || | ||
254 | (GNUNET_OK != | ||
255 | write_all (ds->progress_write, | ||
256 | filename, | ||
257 | slen)) || | ||
258 | (GNUNET_OK != | ||
259 | write_all (ds->progress_write, | ||
260 | &is_directory, | ||
261 | sizeof (is_directory))) ) | ||
262 | return GNUNET_SYSERR; | ||
263 | return GNUNET_OK; | ||
264 | } | ||
265 | |||
266 | |||
267 | /** | ||
268 | * Called every now and then by the scanner thread to check | ||
269 | * if we're being aborted. | ||
270 | * | ||
271 | * @param ds scanner context | ||
272 | * @return GNUNET_OK to continue, GNUNET_SYSERR to stop | ||
273 | */ | ||
274 | static int | ||
275 | test_thread_stop (struct GNUNET_FS_DirScanner *ds) | ||
276 | { | ||
277 | char c; | ||
278 | |||
279 | if ( (GNUNET_DISK_file_read_non_blocking (ds->stop_read, &c, 1) == 1) || | ||
280 | (EAGAIN != errno) ) | ||
281 | return GNUNET_SYSERR; | ||
282 | return GNUNET_OK; | ||
283 | } | ||
284 | |||
285 | |||
286 | /** | ||
287 | * Function called to (recursively) add all of the files in the | ||
288 | * directory to the tree. Called by the directory scanner to initiate | ||
289 | * the scan. Does NOT yet add any metadata. | ||
290 | * | ||
291 | * @param ds directory scanner context to use | ||
292 | * @param filename file or directory to scan | ||
293 | * @param dst where to store the resulting share tree item | ||
294 | * @return GNUNET_OK on success, GNUNET_SYSERR on error | ||
295 | */ | ||
296 | static int | ||
297 | preprocess_file (struct GNUNET_FS_DirScanner *ds, | ||
298 | const char *filename, | ||
299 | struct GNUNET_FS_ShareTreeItem **dst); | ||
300 | |||
301 | |||
302 | /** | ||
303 | * Closure for the 'scan_callback' | ||
304 | */ | ||
305 | struct RecursionContext | ||
306 | { | ||
307 | /** | ||
308 | * Global scanner context. | ||
309 | */ | ||
310 | struct GNUNET_FS_DirScanner *ds; | ||
311 | |||
312 | /** | ||
313 | * Parent to add the files to. | ||
314 | */ | ||
315 | struct GNUNET_FS_ShareTreeItem *parent; | ||
316 | |||
317 | /** | ||
318 | * Flag to set to GNUNET_YES on serious errors. | ||
319 | */ | ||
320 | int stop; | ||
321 | }; | ||
322 | |||
323 | |||
324 | /** | ||
325 | * Function called by the directory iterator to (recursively) add all | ||
326 | * of the files in the directory to the tree. Called by the directory | ||
327 | * scanner to initiate the scan. Does NOT yet add any metadata. | ||
328 | * | ||
329 | * @param cls the 'struct RecursionContext' | ||
330 | * @param filename file or directory to scan | ||
331 | * @return GNUNET_OK on success, GNUNET_SYSERR on error | ||
332 | */ | ||
333 | static int | ||
334 | scan_callback (void *cls, | ||
335 | const char *filename) | ||
336 | { | ||
337 | struct RecursionContext *rc = cls; | ||
338 | struct GNUNET_FS_ShareTreeItem *chld; | ||
339 | |||
340 | if (GNUNET_OK != | ||
341 | preprocess_file (rc->ds, | ||
342 | filename, | ||
343 | &chld)) | ||
344 | { | ||
345 | rc->stop = GNUNET_YES; | ||
346 | return GNUNET_SYSERR; | ||
347 | } | ||
348 | chld->parent = rc->parent; | ||
349 | GNUNET_CONTAINER_DLL_insert (rc->parent->children_head, | ||
350 | rc->parent->children_tail, | ||
351 | chld); | ||
352 | return GNUNET_OK; | ||
353 | } | ||
354 | |||
355 | |||
356 | /** | ||
357 | * Function called to (recursively) add all of the files in the | ||
358 | * directory to the tree. Called by the directory scanner to initiate | ||
359 | * the scan. Does NOT yet add any metadata. | ||
360 | * | ||
361 | * @param ds directory scanner context to use | ||
362 | * @param filename file or directory to scan | ||
363 | * @param dst where to store the resulting share tree item | ||
364 | * @return GNUNET_OK on success, GNUNET_SYSERR on error | ||
365 | */ | ||
366 | static int | ||
367 | preprocess_file (struct GNUNET_FS_DirScanner *ds, | ||
368 | const char *filename, | ||
369 | struct GNUNET_FS_ShareTreeItem **dst) | ||
370 | { | ||
371 | struct GNUNET_FS_ShareTreeItem *item; | ||
372 | struct stat sbuf; | ||
373 | |||
374 | if (0 != STAT (filename, &sbuf)) | ||
375 | { | ||
376 | /* If the file doesn't exist (or is not stat-able for any other reason) | ||
377 | skip it (but report it), but do continue. */ | ||
378 | if (GNUNET_OK != | ||
379 | write_progress (ds, filename, GNUNET_SYSERR, | ||
380 | GNUNET_FS_DIRSCANNER_DOES_NOT_EXIST)) | ||
381 | return GNUNET_SYSERR; | ||
382 | return GNUNET_OK; | ||
383 | } | ||
384 | |||
385 | /* Report the progress */ | ||
386 | if (GNUNET_OK != | ||
387 | write_progress (ds, | ||
388 | filename, | ||
389 | S_ISDIR (sbuf.st_mode) ? GNUNET_YES : GNUNET_NO, | ||
390 | GNUNET_FS_DIRSCANNER_FILE_START)) | ||
391 | return GNUNET_SYSERR; | ||
392 | item = GNUNET_malloc (sizeof (struct GNUNET_FS_ShareTreeItem)); | ||
393 | item->meta = GNUNET_CONTAINER_meta_data_create (); | ||
394 | item->filename = GNUNET_strdup (filename); | ||
395 | item->short_filename = GNUNET_strdup (GNUNET_STRINGS_get_short_name (filename)); | ||
396 | item->is_directory = (S_ISDIR (sbuf.st_mode)) ? GNUNET_YES : GNUNET_NO; | ||
397 | item->file_size = (uint64_t) sbuf.st_size; | ||
398 | if (item->is_directory) | ||
399 | { | ||
400 | struct RecursionContext rc; | ||
401 | |||
402 | rc.parent = item; | ||
403 | rc.ds = ds; | ||
404 | rc.stop = GNUNET_NO; | ||
405 | GNUNET_DISK_directory_scan (filename, | ||
406 | &scan_callback, | ||
407 | &rc); | ||
408 | if ( (rc.stop == GNUNET_YES) || | ||
409 | (GNUNET_OK != | ||
410 | test_thread_stop (ds)) ) | ||
411 | { | ||
412 | GNUNET_FS_share_tree_free (item); | ||
413 | return GNUNET_SYSERR; | ||
414 | } | ||
415 | } | ||
416 | /* Report the progress */ | ||
417 | if (GNUNET_OK != | ||
418 | write_progress (ds, | ||
419 | filename, | ||
420 | S_ISDIR (sbuf.st_mode) ? GNUNET_YES : GNUNET_NO, | ||
421 | GNUNET_FS_DIRSCANNER_SUBTREE_COUNTED)) | ||
422 | { | ||
423 | GNUNET_FS_share_tree_free (item); | ||
424 | return GNUNET_SYSERR; | ||
425 | } | ||
426 | *dst = item; | ||
427 | return GNUNET_OK; | ||
428 | } | ||
429 | |||
430 | |||
431 | /** | ||
432 | * Extract metadata from files. | ||
433 | * | ||
434 | * @param ds directory scanner context | ||
435 | * @param item entry we are processing | ||
436 | * @return GNUNET_OK on success, GNUNET_SYSERR on fatal errors | ||
437 | */ | ||
438 | static int | ||
439 | extract_files (struct GNUNET_FS_DirScanner *ds, | ||
440 | struct GNUNET_FS_ShareTreeItem *item) | ||
441 | { | ||
442 | if (item->is_directory) | ||
443 | { | ||
444 | /* for directories, we simply only descent, no extraction, no | ||
445 | progress reporting */ | ||
446 | struct GNUNET_FS_ShareTreeItem *pos; | ||
447 | |||
448 | for (pos = item->children_head; NULL != pos; pos = pos->next) | ||
449 | if (GNUNET_OK != | ||
450 | extract_files (ds, pos)) | ||
451 | return GNUNET_SYSERR; | ||
452 | return GNUNET_OK; | ||
453 | } | ||
454 | |||
455 | /* this is the expensive operation, *afterwards* we'll check for aborts */ | ||
456 | fprintf (stderr, "\tCalling extract on `%s'\n", item->filename); | ||
457 | GNUNET_FS_meta_data_extract_from_file (item->meta, | ||
458 | item->filename, | ||
459 | ds->plugins); | ||
460 | fprintf (stderr, "\tExtract `%s' done\n", item->filename); | ||
461 | |||
462 | /* having full filenames is too dangerous; always make sure we clean them up */ | ||
463 | GNUNET_CONTAINER_meta_data_delete (item->meta, | ||
464 | EXTRACTOR_METATYPE_FILENAME, | ||
465 | NULL, 0); | ||
466 | GNUNET_CONTAINER_meta_data_insert (item->meta, "<libgnunetfs>", | ||
467 | EXTRACTOR_METATYPE_FILENAME, | ||
468 | EXTRACTOR_METAFORMAT_UTF8, "text/plain", | ||
469 | item->short_filename, | ||
470 | strlen (item->short_filename) + 1); | ||
471 | /* check for abort */ | ||
472 | if (GNUNET_OK != | ||
473 | test_thread_stop (ds)) | ||
474 | return GNUNET_SYSERR; | ||
475 | |||
476 | /* Report the progress */ | ||
477 | if (GNUNET_OK != | ||
478 | write_progress (ds, | ||
479 | item->filename, | ||
480 | GNUNET_NO, | ||
481 | GNUNET_FS_DIRSCANNER_EXTRACT_FINISHED)) | ||
482 | return GNUNET_SYSERR; | ||
483 | return GNUNET_OK; | ||
484 | } | ||
485 | |||
486 | |||
487 | /** | ||
488 | * The function from which the scanner thread starts | ||
489 | * | ||
490 | * @param cls the 'struct GNUNET_FS_DirScanner' | ||
491 | * @return 0/NULL | ||
492 | */ | ||
493 | #if WINDOWS | ||
494 | DWORD | ||
495 | #else | ||
496 | static void * | ||
497 | #endif | ||
498 | run_directory_scan_thread (void *cls) | ||
499 | { | ||
500 | struct GNUNET_FS_DirScanner *ds = cls; | ||
501 | |||
502 | if (GNUNET_OK != preprocess_file (ds, | ||
503 | ds->filename_expanded, | ||
504 | &ds->toplevel)) | ||
505 | { | ||
506 | (void) write_progress (ds, "", GNUNET_SYSERR, GNUNET_FS_DIRSCANNER_INTERNAL_ERROR); | ||
507 | GNUNET_DISK_pipe_close_end (ds->progress_pipe, GNUNET_DISK_PIPE_END_WRITE); | ||
508 | return 0; | ||
509 | } | ||
510 | if (GNUNET_OK != | ||
511 | write_progress (ds, "", GNUNET_SYSERR, GNUNET_FS_DIRSCANNER_ALL_COUNTED)) | ||
512 | { | ||
513 | GNUNET_DISK_pipe_close_end (ds->progress_pipe, GNUNET_DISK_PIPE_END_WRITE); | ||
514 | return 0; | ||
515 | } | ||
516 | if (GNUNET_OK != | ||
517 | extract_files (ds, ds->toplevel)) | ||
518 | { | ||
519 | (void) write_progress (ds, "", GNUNET_SYSERR, GNUNET_FS_DIRSCANNER_INTERNAL_ERROR); | ||
520 | GNUNET_DISK_pipe_close_end (ds->progress_pipe, GNUNET_DISK_PIPE_END_WRITE); | ||
521 | return 0; | ||
522 | } | ||
523 | (void) write_progress (ds, "", GNUNET_SYSERR, GNUNET_FS_DIRSCANNER_FINISHED); | ||
524 | GNUNET_DISK_pipe_close_end (ds->progress_pipe, GNUNET_DISK_PIPE_END_WRITE); | ||
525 | return 0; | ||
526 | } | ||
527 | |||
528 | |||
529 | /** | ||
530 | * Read 'size' bytes from 'in' into 'buf'. | ||
531 | * | ||
532 | * @param in pipe to read from | ||
533 | * @param buf buffer to read to | ||
534 | * @param size number of bytes to read | ||
535 | * @return GNUNET_OK on success, GNUNET_SYSERR on error | ||
536 | */ | ||
537 | static int | ||
538 | read_all (const struct GNUNET_DISK_FileHandle *in, | ||
539 | char *buf, | ||
540 | size_t size) | ||
541 | { | ||
542 | size_t total; | ||
543 | ssize_t rd; | ||
544 | |||
545 | total = 0; | ||
546 | do | ||
547 | { | ||
548 | rd = GNUNET_DISK_file_read (in, | ||
549 | &buf[total], | ||
550 | size - total); | ||
551 | if (rd > 0) | ||
552 | total += rd; | ||
553 | } while ( (rd > 0) && (total < size) ); | ||
554 | if (rd <= 0) | ||
555 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
556 | "Failed to read from inter thread communication pipe: %s\n", | ||
557 | strerror (errno)); | ||
558 | return (total == size) ? GNUNET_OK : GNUNET_SYSERR; | ||
559 | } | ||
560 | |||
561 | |||
562 | /** | ||
563 | * Called every time there is data to read from the scanner. | 124 | * Called every time there is data to read from the scanner. |
564 | * Calls the scanner progress handler. | 125 | * Calls the scanner progress handler. |
565 | * | 126 | * |
566 | * @param cls the closure (directory scanner object) | 127 | * @param cls the closure (directory scanner object) |
567 | * @param tc task context in which the task is running | 128 | * @param client always NULL |
129 | * @param msg message from the helper process | ||
568 | */ | 130 | */ |
569 | static void | 131 | static void |
570 | read_progress_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 132 | process_helper_msgs (void *cls, |
133 | void *client, | ||
134 | const struct GNUNET_MessageHeader *msg) | ||
571 | { | 135 | { |
572 | struct GNUNET_FS_DirScanner *ds = cls; | 136 | struct GNUNET_FS_DirScanner *ds = cls; |
137 | ds++; | ||
138 | #if 0 | ||
573 | enum GNUNET_FS_DirScannerProgressUpdateReason reason; | 139 | enum GNUNET_FS_DirScannerProgressUpdateReason reason; |
574 | size_t filename_len; | 140 | size_t filename_len; |
575 | int is_directory; | 141 | int is_directory; |
576 | char *filename; | 142 | char *filename; |
577 | 143 | ||
578 | ds->progress_read_task = GNUNET_SCHEDULER_NO_TASK; | 144 | /* Process message. If message is malformed or can't be read, end the scanner */ |
579 | if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) | ||
580 | { | ||
581 | ds->progress_read_task | ||
582 | = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, | ||
583 | ds->progress_read, &read_progress_task, | ||
584 | ds); | ||
585 | return; | ||
586 | } | ||
587 | |||
588 | /* Read one message. If message is malformed or can't be read, end the scanner */ | ||
589 | filename = NULL; | ||
590 | if ( (GNUNET_OK != | ||
591 | read_all (ds->progress_read, | ||
592 | (char*) &reason, | ||
593 | sizeof (reason))) || | ||
594 | (reason < GNUNET_FS_DIRSCANNER_FILE_START) || | ||
595 | (reason > GNUNET_FS_DIRSCANNER_INTERNAL_ERROR) || | ||
596 | (GNUNET_OK != | ||
597 | read_all (ds->progress_read, | ||
598 | (char*) &filename_len, | ||
599 | sizeof (size_t))) || | ||
600 | (filename_len == 0) || | ||
601 | (filename_len > PATH_MAX) || | ||
602 | (GNUNET_OK != | ||
603 | read_all (ds->progress_read, | ||
604 | filename = GNUNET_malloc (filename_len), | ||
605 | filename_len)) || | ||
606 | (filename[filename_len-1] != '\0') || | ||
607 | (GNUNET_OK != | ||
608 | read_all (ds->progress_read, | ||
609 | (char*) &is_directory, | ||
610 | sizeof (is_directory))) ) | ||
611 | { | ||
612 | /* IPC error, complain, signal client and stop reading | ||
613 | from the pipe */ | ||
614 | GNUNET_break (0); | ||
615 | ds->progress_callback (ds->progress_callback_cls, ds, | ||
616 | NULL, GNUNET_SYSERR, | ||
617 | GNUNET_FS_DIRSCANNER_INTERNAL_ERROR); | ||
618 | GNUNET_free_non_null (filename); | ||
619 | return; | ||
620 | } | ||
621 | /* schedule task to keep reading (done here in case client calls | ||
622 | abort or something similar) */ | ||
623 | if ( (reason != GNUNET_FS_DIRSCANNER_FINISHED) && | ||
624 | (reason != GNUNET_FS_DIRSCANNER_INTERNAL_ERROR) ) | ||
625 | { | ||
626 | ds->progress_read_task | ||
627 | = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, | ||
628 | ds->progress_read, | ||
629 | &read_progress_task, ds); | ||
630 | } | ||
631 | /* read successfully, notify client about progress */ | 145 | /* read successfully, notify client about progress */ |
632 | ds->progress_callback (ds->progress_callback_cls, | 146 | ds->progress_callback (ds->progress_callback_cls, |
633 | ds, | 147 | ds, |
@@ -635,6 +149,7 @@ read_progress_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
635 | is_directory, | 149 | is_directory, |
636 | reason); | 150 | reason); |
637 | GNUNET_free (filename); | 151 | GNUNET_free (filename); |
152 | #endif | ||
638 | } | 153 | } |
639 | 154 | ||
640 | 155 | ||
@@ -657,9 +172,7 @@ GNUNET_FS_directory_scan_start (const char *filename, | |||
657 | struct stat sbuf; | 172 | struct stat sbuf; |
658 | char *filename_expanded; | 173 | char *filename_expanded; |
659 | struct GNUNET_FS_DirScanner *ds; | 174 | struct GNUNET_FS_DirScanner *ds; |
660 | struct GNUNET_DISK_PipeHandle *progress_pipe; | 175 | char *args[4]; |
661 | struct GNUNET_DISK_PipeHandle *stop_pipe; | ||
662 | int ok; | ||
663 | 176 | ||
664 | if (0 != STAT (filename, &sbuf)) | 177 | if (0 != STAT (filename, &sbuf)) |
665 | return NULL; | 178 | return NULL; |
@@ -669,63 +182,25 @@ GNUNET_FS_directory_scan_start (const char *filename, | |||
669 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 182 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
670 | "Starting to scan directory `%s'\n", | 183 | "Starting to scan directory `%s'\n", |
671 | filename_expanded); | 184 | filename_expanded); |
672 | progress_pipe = GNUNET_DISK_pipe (GNUNET_NO, GNUNET_NO, GNUNET_NO, GNUNET_NO); | ||
673 | if (NULL == progress_pipe) | ||
674 | { | ||
675 | GNUNET_free (filename_expanded); | ||
676 | return NULL; | ||
677 | } | ||
678 | stop_pipe = GNUNET_DISK_pipe (GNUNET_NO, GNUNET_NO, GNUNET_NO, GNUNET_NO); | ||
679 | if (NULL == stop_pipe) | ||
680 | { | ||
681 | GNUNET_DISK_pipe_close (progress_pipe); | ||
682 | GNUNET_free (filename_expanded); | ||
683 | return NULL; | ||
684 | } | ||
685 | |||
686 | ds = GNUNET_malloc (sizeof (struct GNUNET_FS_DirScanner)); | 185 | ds = GNUNET_malloc (sizeof (struct GNUNET_FS_DirScanner)); |
687 | ds->progress_callback = cb; | 186 | ds->progress_callback = cb; |
688 | ds->progress_callback_cls = cb_cls; | 187 | ds->progress_callback_cls = cb_cls; |
689 | ds->stop_pipe = stop_pipe; | ||
690 | ds->stop_write = GNUNET_DISK_pipe_handle (ds->stop_pipe, | ||
691 | GNUNET_DISK_PIPE_END_WRITE); | ||
692 | ds->stop_read = GNUNET_DISK_pipe_handle (ds->stop_pipe, | ||
693 | GNUNET_DISK_PIPE_END_READ); | ||
694 | ds->progress_pipe = progress_pipe; | ||
695 | ds->progress_write = GNUNET_DISK_pipe_handle (progress_pipe, | ||
696 | GNUNET_DISK_PIPE_END_WRITE); | ||
697 | ds->progress_read = GNUNET_DISK_pipe_handle (progress_pipe, | ||
698 | GNUNET_DISK_PIPE_END_READ); | ||
699 | ds->filename_expanded = filename_expanded; | 188 | ds->filename_expanded = filename_expanded; |
700 | if (! disable_extractor) | 189 | ds->ex_arg = GNUNET_strdup ((disable_extractor) ? "-" : ex); |
701 | { | 190 | args[0] = "gnunet-helper-fs-publish"; |
702 | ds->plugins = EXTRACTOR_plugin_add_defaults (EXTRACTOR_OPTION_DEFAULT_POLICY); | 191 | args[1] = ds->filename_expanded; |
703 | if ( (NULL != ex) && strlen (ex) > 0) | 192 | args[2] = ds->ex_arg; |
704 | ds->plugins = EXTRACTOR_plugin_add_config (ds->plugins, ex, | 193 | args[3] = NULL; |
705 | EXTRACTOR_OPTION_DEFAULT_POLICY); | 194 | ds->helper = GNUNET_HELPER_start ("gnunet-helper-fs-publish", |
706 | } | 195 | args, |
707 | #if WINDOWS | 196 | &process_helper_msgs, |
708 | ds->thread = CreateThread (NULL, 0, | 197 | ds); |
709 | (LPTHREAD_START_ROUTINE) &run_directory_scan_thread, | 198 | if (NULL == ds->helper) |
710 | (LPVOID) ds, 0, NULL); | ||
711 | ok = (ds->thread != NULL); | ||
712 | #else | ||
713 | ok = (0 == pthread_create (&ds->thread, NULL, | ||
714 | &run_directory_scan_thread, ds)); | ||
715 | #endif | ||
716 | if (!ok) | ||
717 | { | 199 | { |
718 | EXTRACTOR_plugin_remove_all (ds->plugins); | ||
719 | GNUNET_free (filename_expanded); | 200 | GNUNET_free (filename_expanded); |
720 | GNUNET_DISK_pipe_close (stop_pipe); | ||
721 | GNUNET_DISK_pipe_close (progress_pipe); | ||
722 | GNUNET_free (ds); | 201 | GNUNET_free (ds); |
723 | return NULL; | 202 | return NULL; |
724 | } | 203 | } |
725 | ds->progress_read_task | ||
726 | = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, | ||
727 | ds->progress_read, | ||
728 | &read_progress_task, ds); | ||
729 | return ds; | 204 | return ds; |
730 | } | 205 | } |
731 | 206 | ||