libextractor

GNU libextractor
Log | Files | Refs | Submodules | README | LICENSE

commit e339d01741812b49fa8868610d0df993fec4e5db
parent c84dfab73816abe94060b6c784fef551d4bc6084
Author: Christian Grothoff <christian@grothoff.org>
Date:   Tue, 24 Jul 2012 21:45:25 +0000

-towards compilation

Diffstat:
Msrc/main/extractor.c | 303+++++++++++++++++++++++++++++++++++--------------------------------------------
Msrc/main/extractor_plugins.c | 2++
Msrc/main/extractor_plugins.h | 7++++++-
3 files changed, 143 insertions(+), 169 deletions(-)

diff --git a/src/main/extractor.c b/src/main/extractor.c @@ -31,6 +31,12 @@ #include "extractor_plugins.h" +/** + * Size used for the shared memory segment. + */ +#define DEFAULT_SHM_SIZE (16 * 1024) + + #if 0 /** * Open a file @@ -489,189 +495,131 @@ load_in_process_plugin (struct EXTRACTOR_PluginList *plugin) #endif +/** + * Closure for 'process_plugin_reply' + */ +struct PluginReplyProcessor +{ + /** + * Function to call if we receive meta data from the plugin. + */ + EXTRACTOR_MetaDataProcessor proc; + + /** + * Closure for 'proc'. + */ + void *proc_cls; + +}; + + +/** + * Handler for a message from one of the plugins. + * + * @param cls closure with our 'struct PluginReplyProcessor' + * @param plugin plugin of the channel sending the message + * @param meta_type type of the meta data + * @param meta_format format of the meta data + * @param value_len number of bytes in 'value' + * @param value 'data' send from the plugin + * @param mime mime string send from the plugin + */ +static void +process_plugin_reply (void *cls, + struct EXTRACTOR_PluginList *plugin, + enum EXTRACTOR_MetaType meta_type, + enum EXTRACTOR_MetaFormat meta_format, + size_t value_len, + const void *value, + const char *mime) +{ + struct PluginReplyProcessor *prp = cls; + + // FIXME... +} /** * Extract keywords using the given set of plugins. * * @param plugins the list of plugins to use - * @param data data to process, or NULL if fds is not -1 - * @param fd file to read data from, or -1 if data is not NULL - * @param filename name of the file to which fd belongs - * @param cfs compressed file source for compressed stream (may be NULL) - * @param fsize size of the file or data buffer + * @param shm shared memory object used by the plugins (NULL if + * all plugins are in-process) + * @param ds data to process * @param proc function to call for each meta data item found * @param proc_cls cls argument to proc */ static void do_extract (struct EXTRACTOR_PluginList *plugins, - const char *data, - int fd, - const char *filename, - struct CompressedFileSource *cfs, - int64_t fsize, + struct EXTRACTOR_SharedMemory *shm, + struct EXTRACTOR_Datasource *ds, EXTRACTOR_MetaDataProcessor proc, void *proc_cls) { - int operation_mode; - int plugin_count = 0; - int shm_result; - unsigned char *shm_ptr; -#if !WINDOWS - int shm_id; -#else - HANDLE map_handle; -#endif - char shm_name[MAX_SHM_NAME + 1]; - - struct EXTRACTOR_PluginList *ppos; - - int64_t position = 0; - int64_t preserve = 0; - size_t map_size; - ssize_t read_result; - int kill_plugins = 0; - - if (cfs != NULL) - operation_mode = OPMODE_DECOMPRESS; - else if (data != NULL) - operation_mode = OPMODE_MEMORY; - else if (fd != -1) - operation_mode = OPMODE_FILE; + unsigned int plugin_count; + struct EXTRACTOR_PluginList *pos; + struct StartMessage start; + struct EXTRACTOR_Channel *channel; + struct PluginReplyProcessor prp; + uint32_t ready; + int done; + + plugin_count = 0; + for (pos = plugins; NULL != pos; pos = pos->next) + plugin_count++; + if (NULL != shm) + ready = EXTRACTOR_IPC_shared_memory_set_ (shm, ds, 0, DEFAULT_SHM_SIZE); else - return; - - map_size = (fd == -1) ? fsize : MAX_READ; - - /* Make a shared memory object. Even if we're running in-process. Simpler that way. - * This is only for reading-from-memory case. For reading-from-file we will use - * the file itself; for uncompressing-on-the-fly the decompressor will make its own - * shared memory object and uncompress into it directly. - */ - if (operation_mode == OPMODE_MEMORY) - { - operation_mode = OPMODE_MEMORY; -#if !WINDOWS - shm_result = make_shm_posix ((void **) &shm_ptr, &shm_id, shm_name, MAX_SHM_NAME, - fsize); -#else - shm_result = make_shm_w32 ((void **) &shm_ptr, &map_handle, shm_name, MAX_SHM_NAME, - fsize); -#endif - if (shm_result != 0) - return; - memcpy (shm_ptr, data, fsize); - } - else if (operation_mode == OPMODE_FILE) - { -#if WINDOWS - shm_result = make_file_backed_shm_w32 (&map_handle, (HANDLE) _get_osfhandle (fd), shm_name, MAX_SHM_NAME); - if (shm_result != 0) - return; -#endif - } - - /* This four-loops-instead-of-one construction is intended to increase parallelism */ - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - { - start_process (ppos); - plugin_count += 1; - } - - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - load_in_process_plugin (ppos); + ready = 0; - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - write_plugin_data (ppos); + prp.proc = proc; + prp.proc_cls = proc_cls; - if (operation_mode == OPMODE_DECOMPRESS) + /* send 'start' message */ + start.opcode = MESSAGE_EXTRACT_START; + start.reserved = 0; + start.reserved2 = 0; + start.shm_ready_bytes = ready; + start.file_size = EXTRACTOR_datasource_get_size_ (ds); { - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - init_plugin_state (ppos, operation_mode, cfs->shm_name, -1); - } - else if (operation_mode == OPMODE_FILE) - { - for (ppos = plugins; NULL != ppos; ppos = ppos->next) -#if !WINDOWS - init_plugin_state (ppos, operation_mode, filename, fsize); -#else - init_plugin_state (ppos, operation_mode, shm_name, fsize); -#endif - } - else - { - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - init_plugin_state (ppos, operation_mode, shm_name, fsize); - } + struct EXTRACTOR_Channel *channels[plugin_count]; - if (operation_mode == OPMODE_FILE || operation_mode == OPMODE_MEMORY) - { - int plugins_not_ready = 0; - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - plugins_not_ready += give_shm_to_plugin (ppos, position, map_size, fsize, operation_mode); - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - ask_in_process_plugin (ppos, shm_ptr, proc, proc_cls); - while (plugins_not_ready > 0 && !kill_plugins) - { - int ready = wait_for_reply (plugins, proc, proc_cls); - if (ready <= 0) - kill_plugins = 1; - plugins_not_ready -= ready; - } - } - else - { - read_result = cfs_read (cfs, preserve); - if (read_result > 0) - while (1) - { - int plugins_not_ready = 0; - - map_size = cfs->shm_buf_size; - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - plugins_not_ready += give_shm_to_plugin (ppos, position, map_size, cfs->uncompressed_size, operation_mode); - /* Can't block in in-process plugins, unless we ONLY have one plugin */ - if (plugin_count == 1) - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - { - /* Pass this way. we'll need it to call cfs functions later on */ - /* This is a special case */ - ppos->pass_cfs = cfs; - ask_in_process_plugin (ppos, cfs->shm_ptr, proc, proc_cls); - } - while (plugins_not_ready > 0 && !kill_plugins) + plugin_count = 0; + for (pos = plugins; NULL != pos; pos = pos->next) { - int ready = wait_for_reply (plugins, proc, proc_cls); - if (ready <= 0) - kill_plugins = 1; - plugins_not_ready -= ready; + channels[plugin_count] = pos->channel; + if ( (NULL != pos->channel) && + (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel, + &start, + sizeof (start)) ) ) + { + channels[plugin_count] = NULL; + EXTRACTOR_IPC_channel_destroy_ (pos->channel); + pos->channel = NULL; + } + plugin_count++; + } + done = 0; + while (! done) + { + done = 1; + + // FIXME: need to handle 'seek' messages from plugins somewhere + if (-1 == + EXTRACTOR_IPC_channel_recv_ (channels, + plugin_count, + &process_plugin_reply, + &prp)) + break; + plugin_count = 0; + for (pos = plugins; NULL != pos; pos = pos->next) + { + channel = channels[plugin_count]; + // ... FIXME ... + plugin_count++; + } + // FIXME: need to terminate once all plugins are done... + done = 0; } - if (kill_plugins) - break; - position = seek_to_new_position (plugins, cfs, position, map_size); - if (position < 0 || position == cfs->uncompressed_size) - break; - } - } - - if (kill_plugins) - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - stop_process (ppos); - for (ppos = plugins; NULL != ppos; ppos = ppos->next) - discard_plugin_state (ppos); - - if (operation_mode == OPMODE_MEMORY) - { -#if WINDOWS - destroy_shm_w32 (shm_ptr, map_handle); -#else - destroy_shm_posix (shm_ptr, shm_id, (fd == -1) ? fsize : MAX_READ, shm_name); -#endif - } - else if (operation_mode == OPMODE_FILE) - { -#if WINDOWS - destroy_file_backed_shm_w32 (map_handle); -#endif } } @@ -699,14 +647,34 @@ EXTRACTOR_extract (struct EXTRACTOR_PluginList *plugins, void *proc_cls) { struct EXTRACTOR_Datasource *datasource; + struct EXTRACTOR_SharedMemory *shm; + struct EXTRACTOR_PluginList *pos; + if (NULL == plugins) + return; if (NULL == filename) - datasource = EXTRACTOR_datasource_create_from_buffer_ (data, size); + datasource = EXTRACTOR_datasource_create_from_buffer_ (data, size, + proc, proc_cls); else - datasource = EXTRACTOR_datasource_create_from_file_ (filename); + datasource = EXTRACTOR_datasource_create_from_file_ (filename, + proc, proc_cls); if (NULL == datasource) - return; - do_extract (plugins, datasource, proc, proc_cls); + return; + shm = NULL; + for (pos = plugins; NULL != pos; pos = pos->next) + if (NULL != (shm = pos->shm)) + break; + if (NULL == shm) + shm = EXTRACTOR_IPC_shared_memory_create_ (DEFAULT_SHM_SIZE); + for (pos = plugins; NULL != pos; pos = pos->next) + if ( (NULL == pos->shm) && + (0 == (pos->flags & EXTRACTOR_OPTION_IN_PROCESS)) ) + { + pos->shm = shm; + pos->channel = EXTRACTOR_IPC_channel_create_ (pos, + shm); + } + do_extract (plugins, shm, datasource, proc, proc_cls); EXTRACTOR_datasource_destroy_ (datasource); } @@ -721,7 +689,6 @@ EXTRACTOR_ltdl_init () #if ENABLE_NLS BINDTEXTDOMAIN (PACKAGE, LOCALEDIR); - BINDTEXTDOMAIN ("iso-639", ISOLOCALEDIR); /* used by wordextractor */ #endif err = lt_dlinit (); if (err > 0) diff --git a/src/main/extractor_plugins.c b/src/main/extractor_plugins.c @@ -380,6 +380,8 @@ EXTRACTOR_plugin_remove (struct EXTRACTOR_PluginList * prev, prev->next = pos->next; if (NULL != pos->channel) EXTRACTOR_IPC_channel_destroy_ (pos->channel); + // FIXME: need to also destroy pos->shm if this is + // the last user; need to add some RC to the SHM! free (pos->short_libname); free (pos->libname); free (pos->plugin_options); diff --git a/src/main/extractor_plugins.h b/src/main/extractor_plugins.h @@ -78,11 +78,16 @@ struct EXTRACTOR_PluginList const char *specials; /** - * Channel to communicate with out-of-process plugin. + * Channel to communicate with out-of-process plugin, NULL if not setup. */ struct EXTRACTOR_Channel *channel; /** + * Memory segment shared with the channel of this plugin, NULL for none. + */ + struct EXTRACTOR_SharedMemory *shm; + + /** * A position this plugin wants us to seek to. -1 if it's finished. * Starts at 0. */