aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-07-24 21:45:25 +0000
committerChristian Grothoff <christian@grothoff.org>2012-07-24 21:45:25 +0000
commite339d01741812b49fa8868610d0df993fec4e5db (patch)
treee52ba30b8a6c03eabe29e295aebd00e3fd1653ba
parentc84dfab73816abe94060b6c784fef551d4bc6084 (diff)
downloadlibextractor-e339d01741812b49fa8868610d0df993fec4e5db.tar.gz
libextractor-e339d01741812b49fa8868610d0df993fec4e5db.zip
-towards compilation
-rw-r--r--src/main/extractor.c303
-rw-r--r--src/main/extractor_plugins.c2
-rw-r--r--src/main/extractor_plugins.h7
3 files changed, 143 insertions, 169 deletions
diff --git a/src/main/extractor.c b/src/main/extractor.c
index a381928..a529545 100644
--- a/src/main/extractor.c
+++ b/src/main/extractor.c
@@ -31,6 +31,12 @@
31#include "extractor_plugins.h" 31#include "extractor_plugins.h"
32 32
33 33
34/**
35 * Size used for the shared memory segment.
36 */
37#define DEFAULT_SHM_SIZE (16 * 1024)
38
39
34#if 0 40#if 0
35/** 41/**
36 * Open a file 42 * Open a file
@@ -489,189 +495,131 @@ load_in_process_plugin (struct EXTRACTOR_PluginList *plugin)
489#endif 495#endif
490 496
491 497
498/**
499 * Closure for 'process_plugin_reply'
500 */
501struct PluginReplyProcessor
502{
503 /**
504 * Function to call if we receive meta data from the plugin.
505 */
506 EXTRACTOR_MetaDataProcessor proc;
507
508 /**
509 * Closure for 'proc'.
510 */
511 void *proc_cls;
512
513};
514
515
516/**
517 * Handler for a message from one of the plugins.
518 *
519 * @param cls closure with our 'struct PluginReplyProcessor'
520 * @param plugin plugin of the channel sending the message
521 * @param meta_type type of the meta data
522 * @param meta_format format of the meta data
523 * @param value_len number of bytes in 'value'
524 * @param value 'data' send from the plugin
525 * @param mime mime string send from the plugin
526 */
527static void
528process_plugin_reply (void *cls,
529 struct EXTRACTOR_PluginList *plugin,
530 enum EXTRACTOR_MetaType meta_type,
531 enum EXTRACTOR_MetaFormat meta_format,
532 size_t value_len,
533 const void *value,
534 const char *mime)
535{
536 struct PluginReplyProcessor *prp = cls;
537
538 // FIXME...
539}
492 540
493 541
494/** 542/**
495 * Extract keywords using the given set of plugins. 543 * Extract keywords using the given set of plugins.
496 * 544 *
497 * @param plugins the list of plugins to use 545 * @param plugins the list of plugins to use
498 * @param data data to process, or NULL if fds is not -1 546 * @param shm shared memory object used by the plugins (NULL if
499 * @param fd file to read data from, or -1 if data is not NULL 547 * all plugins are in-process)
500 * @param filename name of the file to which fd belongs 548 * @param ds data to process
501 * @param cfs compressed file source for compressed stream (may be NULL)
502 * @param fsize size of the file or data buffer
503 * @param proc function to call for each meta data item found 549 * @param proc function to call for each meta data item found
504 * @param proc_cls cls argument to proc 550 * @param proc_cls cls argument to proc
505 */ 551 */
506static void 552static void
507do_extract (struct EXTRACTOR_PluginList *plugins, 553do_extract (struct EXTRACTOR_PluginList *plugins,
508 const char *data, 554 struct EXTRACTOR_SharedMemory *shm,
509 int fd, 555 struct EXTRACTOR_Datasource *ds,
510 const char *filename,
511 struct CompressedFileSource *cfs,
512 int64_t fsize,
513 EXTRACTOR_MetaDataProcessor proc, void *proc_cls) 556 EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
514{ 557{
515 int operation_mode; 558 unsigned int plugin_count;
516 int plugin_count = 0; 559 struct EXTRACTOR_PluginList *pos;
517 int shm_result; 560 struct StartMessage start;
518 unsigned char *shm_ptr; 561 struct EXTRACTOR_Channel *channel;
519#if !WINDOWS 562 struct PluginReplyProcessor prp;
520 int shm_id; 563 uint32_t ready;
521#else 564 int done;
522 HANDLE map_handle; 565
523#endif 566 plugin_count = 0;
524 char shm_name[MAX_SHM_NAME + 1]; 567 for (pos = plugins; NULL != pos; pos = pos->next)
525 568 plugin_count++;
526 struct EXTRACTOR_PluginList *ppos; 569 if (NULL != shm)
527 570 ready = EXTRACTOR_IPC_shared_memory_set_ (shm, ds, 0, DEFAULT_SHM_SIZE);
528 int64_t position = 0;
529 int64_t preserve = 0;
530 size_t map_size;
531 ssize_t read_result;
532 int kill_plugins = 0;
533
534 if (cfs != NULL)
535 operation_mode = OPMODE_DECOMPRESS;
536 else if (data != NULL)
537 operation_mode = OPMODE_MEMORY;
538 else if (fd != -1)
539 operation_mode = OPMODE_FILE;
540 else 571 else
541 return; 572 ready = 0;
542
543 map_size = (fd == -1) ? fsize : MAX_READ;
544
545 /* Make a shared memory object. Even if we're running in-process. Simpler that way.
546 * This is only for reading-from-memory case. For reading-from-file we will use
547 * the file itself; for uncompressing-on-the-fly the decompressor will make its own
548 * shared memory object and uncompress into it directly.
549 */
550 if (operation_mode == OPMODE_MEMORY)
551 {
552 operation_mode = OPMODE_MEMORY;
553#if !WINDOWS
554 shm_result = make_shm_posix ((void **) &shm_ptr, &shm_id, shm_name, MAX_SHM_NAME,
555 fsize);
556#else
557 shm_result = make_shm_w32 ((void **) &shm_ptr, &map_handle, shm_name, MAX_SHM_NAME,
558 fsize);
559#endif
560 if (shm_result != 0)
561 return;
562 memcpy (shm_ptr, data, fsize);
563 }
564 else if (operation_mode == OPMODE_FILE)
565 {
566#if WINDOWS
567 shm_result = make_file_backed_shm_w32 (&map_handle, (HANDLE) _get_osfhandle (fd), shm_name, MAX_SHM_NAME);
568 if (shm_result != 0)
569 return;
570#endif
571 }
572
573 /* This four-loops-instead-of-one construction is intended to increase parallelism */
574 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
575 {
576 start_process (ppos);
577 plugin_count += 1;
578 }
579
580 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
581 load_in_process_plugin (ppos);
582 573
583 for (ppos = plugins; NULL != ppos; ppos = ppos->next) 574 prp.proc = proc;
584 write_plugin_data (ppos); 575 prp.proc_cls = proc_cls;
585 576
586 if (operation_mode == OPMODE_DECOMPRESS) 577 /* send 'start' message */
578 start.opcode = MESSAGE_EXTRACT_START;
579 start.reserved = 0;
580 start.reserved2 = 0;
581 start.shm_ready_bytes = ready;
582 start.file_size = EXTRACTOR_datasource_get_size_ (ds);
587 { 583 {
588 for (ppos = plugins; NULL != ppos; ppos = ppos->next) 584 struct EXTRACTOR_Channel *channels[plugin_count];
589 init_plugin_state (ppos, operation_mode, cfs->shm_name, -1);
590 }
591 else if (operation_mode == OPMODE_FILE)
592 {
593 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
594#if !WINDOWS
595 init_plugin_state (ppos, operation_mode, filename, fsize);
596#else
597 init_plugin_state (ppos, operation_mode, shm_name, fsize);
598#endif
599 }
600 else
601 {
602 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
603 init_plugin_state (ppos, operation_mode, shm_name, fsize);
604 }
605 585
606 if (operation_mode == OPMODE_FILE || operation_mode == OPMODE_MEMORY) 586 plugin_count = 0;
607 { 587 for (pos = plugins; NULL != pos; pos = pos->next)
608 int plugins_not_ready = 0;
609 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
610 plugins_not_ready += give_shm_to_plugin (ppos, position, map_size, fsize, operation_mode);
611 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
612 ask_in_process_plugin (ppos, shm_ptr, proc, proc_cls);
613 while (plugins_not_ready > 0 && !kill_plugins)
614 {
615 int ready = wait_for_reply (plugins, proc, proc_cls);
616 if (ready <= 0)
617 kill_plugins = 1;
618 plugins_not_ready -= ready;
619 }
620 }
621 else
622 {
623 read_result = cfs_read (cfs, preserve);
624 if (read_result > 0)
625 while (1)
626 {
627 int plugins_not_ready = 0;
628
629 map_size = cfs->shm_buf_size;
630 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
631 plugins_not_ready += give_shm_to_plugin (ppos, position, map_size, cfs->uncompressed_size, operation_mode);
632 /* Can't block in in-process plugins, unless we ONLY have one plugin */
633 if (plugin_count == 1)
634 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
635 {
636 /* Pass this way. we'll need it to call cfs functions later on */
637 /* This is a special case */
638 ppos->pass_cfs = cfs;
639 ask_in_process_plugin (ppos, cfs->shm_ptr, proc, proc_cls);
640 }
641 while (plugins_not_ready > 0 && !kill_plugins)
642 { 588 {
643 int ready = wait_for_reply (plugins, proc, proc_cls); 589 channels[plugin_count] = pos->channel;
644 if (ready <= 0) 590 if ( (NULL != pos->channel) &&
645 kill_plugins = 1; 591 (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
646 plugins_not_ready -= ready; 592 &start,
593 sizeof (start)) ) )
594 {
595 channels[plugin_count] = NULL;
596 EXTRACTOR_IPC_channel_destroy_ (pos->channel);
597 pos->channel = NULL;
598 }
599 plugin_count++;
600 }
601 done = 0;
602 while (! done)
603 {
604 done = 1;
605
606 // FIXME: need to handle 'seek' messages from plugins somewhere
607 if (-1 ==
608 EXTRACTOR_IPC_channel_recv_ (channels,
609 plugin_count,
610 &process_plugin_reply,
611 &prp))
612 break;
613 plugin_count = 0;
614 for (pos = plugins; NULL != pos; pos = pos->next)
615 {
616 channel = channels[plugin_count];
617 // ... FIXME ...
618 plugin_count++;
619 }
620 // FIXME: need to terminate once all plugins are done...
621 done = 0;
647 } 622 }
648 if (kill_plugins)
649 break;
650 position = seek_to_new_position (plugins, cfs, position, map_size);
651 if (position < 0 || position == cfs->uncompressed_size)
652 break;
653 }
654 }
655
656 if (kill_plugins)
657 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
658 stop_process (ppos);
659 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
660 discard_plugin_state (ppos);
661
662 if (operation_mode == OPMODE_MEMORY)
663 {
664#if WINDOWS
665 destroy_shm_w32 (shm_ptr, map_handle);
666#else
667 destroy_shm_posix (shm_ptr, shm_id, (fd == -1) ? fsize : MAX_READ, shm_name);
668#endif
669 }
670 else if (operation_mode == OPMODE_FILE)
671 {
672#if WINDOWS
673 destroy_file_backed_shm_w32 (map_handle);
674#endif
675 } 623 }
676} 624}
677 625
@@ -699,14 +647,34 @@ EXTRACTOR_extract (struct EXTRACTOR_PluginList *plugins,
699 void *proc_cls) 647 void *proc_cls)
700{ 648{
701 struct EXTRACTOR_Datasource *datasource; 649 struct EXTRACTOR_Datasource *datasource;
650 struct EXTRACTOR_SharedMemory *shm;
651 struct EXTRACTOR_PluginList *pos;
702 652
653 if (NULL == plugins)
654 return;
703 if (NULL == filename) 655 if (NULL == filename)
704 datasource = EXTRACTOR_datasource_create_from_buffer_ (data, size); 656 datasource = EXTRACTOR_datasource_create_from_buffer_ (data, size,
657 proc, proc_cls);
705 else 658 else
706 datasource = EXTRACTOR_datasource_create_from_file_ (filename); 659 datasource = EXTRACTOR_datasource_create_from_file_ (filename,
660 proc, proc_cls);
707 if (NULL == datasource) 661 if (NULL == datasource)
708 return; 662 return;
709 do_extract (plugins, datasource, proc, proc_cls); 663 shm = NULL;
664 for (pos = plugins; NULL != pos; pos = pos->next)
665 if (NULL != (shm = pos->shm))
666 break;
667 if (NULL == shm)
668 shm = EXTRACTOR_IPC_shared_memory_create_ (DEFAULT_SHM_SIZE);
669 for (pos = plugins; NULL != pos; pos = pos->next)
670 if ( (NULL == pos->shm) &&
671 (0 == (pos->flags & EXTRACTOR_OPTION_IN_PROCESS)) )
672 {
673 pos->shm = shm;
674 pos->channel = EXTRACTOR_IPC_channel_create_ (pos,
675 shm);
676 }
677 do_extract (plugins, shm, datasource, proc, proc_cls);
710 EXTRACTOR_datasource_destroy_ (datasource); 678 EXTRACTOR_datasource_destroy_ (datasource);
711} 679}
712 680
@@ -721,7 +689,6 @@ EXTRACTOR_ltdl_init ()
721 689
722#if ENABLE_NLS 690#if ENABLE_NLS
723 BINDTEXTDOMAIN (PACKAGE, LOCALEDIR); 691 BINDTEXTDOMAIN (PACKAGE, LOCALEDIR);
724 BINDTEXTDOMAIN ("iso-639", ISOLOCALEDIR); /* used by wordextractor */
725#endif 692#endif
726 err = lt_dlinit (); 693 err = lt_dlinit ();
727 if (err > 0) 694 if (err > 0)
diff --git a/src/main/extractor_plugins.c b/src/main/extractor_plugins.c
index 9a16a33..7f43808 100644
--- a/src/main/extractor_plugins.c
+++ b/src/main/extractor_plugins.c
@@ -380,6 +380,8 @@ EXTRACTOR_plugin_remove (struct EXTRACTOR_PluginList * prev,
380 prev->next = pos->next; 380 prev->next = pos->next;
381 if (NULL != pos->channel) 381 if (NULL != pos->channel)
382 EXTRACTOR_IPC_channel_destroy_ (pos->channel); 382 EXTRACTOR_IPC_channel_destroy_ (pos->channel);
383 // FIXME: need to also destroy pos->shm if this is
384 // the last user; need to add some RC to the SHM!
383 free (pos->short_libname); 385 free (pos->short_libname);
384 free (pos->libname); 386 free (pos->libname);
385 free (pos->plugin_options); 387 free (pos->plugin_options);
diff --git a/src/main/extractor_plugins.h b/src/main/extractor_plugins.h
index 0903f55..a15069a 100644
--- a/src/main/extractor_plugins.h
+++ b/src/main/extractor_plugins.h
@@ -78,11 +78,16 @@ struct EXTRACTOR_PluginList
78 const char *specials; 78 const char *specials;
79 79
80 /** 80 /**
81 * Channel to communicate with out-of-process plugin. 81 * Channel to communicate with out-of-process plugin, NULL if not setup.
82 */ 82 */
83 struct EXTRACTOR_Channel *channel; 83 struct EXTRACTOR_Channel *channel;
84 84
85 /** 85 /**
86 * Memory segment shared with the channel of this plugin, NULL for none.
87 */
88 struct EXTRACTOR_SharedMemory *shm;
89
90 /**
86 * A position this plugin wants us to seek to. -1 if it's finished. 91 * A position this plugin wants us to seek to. -1 if it's finished.
87 * Starts at 0. 92 * Starts at 0.
88 */ 93 */