libextractor

GNU libextractor
Log | Files | Refs | Submodules | README | LICENSE

commit 1687a6aff0c356fb2638f293236097c170bde7e7
parent 21a2b41325ef87f55e3755a14f46f6f91dae599e
Author: LRN <lrn1986@gmail.com>
Date:   Wed,  1 Aug 2012 20:25:43 +0000

One huge fix. Sorry.

Diffstat:
Msrc/main/extractor.c | 18++++++++++++++----
Msrc/main/extractor_ipc.c | 2++
Msrc/main/extractor_ipc.h | 10++++++++++
Msrc/main/extractor_ipc_gnu.c | 14++++++++++++++
Msrc/main/extractor_ipc_w32.c | 761++++++++++++++++++++++++++++++++-----------------------------------------------
Msrc/main/extractor_plugin_main.c | 25+++++++++++++++----------
Msrc/main/test_extractor.c | 113+++++++++++++++++++++++++++++++++++++++++++------------------------------------
Msrc/main/test_ipc.c | 2+-
Msrc/plugins/Makefile.am | 16++++++++--------
9 files changed, 434 insertions(+), 527 deletions(-)

diff --git a/src/main/extractor.c b/src/main/extractor.c @@ -216,6 +216,7 @@ do_extract (struct EXTRACTOR_PluginList *plugins, struct PluginReplyProcessor prp; int64_t min_seek; ssize_t data_available; + uint64_t last_position; uint32_t ready; int done; @@ -227,6 +228,8 @@ do_extract (struct EXTRACTOR_PluginList *plugins, else ready = 0; + last_position = UINT64_MAX; + prp.file_finished = 0; prp.proc = proc; prp.proc_cls = proc_cls; @@ -285,7 +288,10 @@ do_extract (struct EXTRACTOR_PluginList *plugins, if ( (1 == pos->round_finished) || (NULL == pos->channel) ) continue; /* inactive plugin */ - if (-1 == pos->seek_request) + if (((last_position <= pos->seek_request) && + (last_position + data_available > pos->seek_request)) && + ((data_available > 0) || + (last_position == EXTRACTOR_datasource_get_size_ (ds)))) { done = 0; /* possibly more meta data at current position! */ break; @@ -318,15 +324,17 @@ do_extract (struct EXTRACTOR_PluginList *plugins, { if (NULL == (channel = pos->channel)) continue; - if ( (-1 != pos->seek_request) && + if ( (-1 != pos->seek_request) && ((last_position > pos->seek_request) || + (last_position + data_available <= pos->seek_request)) && (min_seek <= pos->seek_request) && - (min_seek + data_available > pos->seek_request) ) + ((min_seek + data_available > pos->seek_request) || + (min_seek == EXTRACTOR_datasource_get_size_ (ds))) ) { send_update_message (pos, min_seek, data_available, ds); - pos->seek_request = -1; + /*pos->seek_request = -1;*/ } if ( (-1 != pos->seek_request) && (1 == prp.file_finished) ) @@ -337,6 +345,8 @@ do_extract (struct EXTRACTOR_PluginList *plugins, if (0 == pos->round_finished) done = 0; /* can't be done, plugin still active */ } + if (min_seek >= 0) + last_position = min_seek; } /* run in-process plugins */ diff --git a/src/main/extractor_ipc.c b/src/main/extractor_ipc.c @@ -105,6 +105,8 @@ EXTRACTOR_IPC_process_reply_ (struct EXTRACTOR_PluginList *plugin, value = NULL; else value = &cdata[sizeof (struct MetaMessage) + meta.mime_length]; + if (meta.meta_type >= EXTRACTOR_metatype_get_max ()) + meta.meta_type = EXTRACTOR_METATYPE_UNKNOWN; proc (proc_cls, plugin, (enum EXTRACTOR_MetaType) meta.meta_type, diff --git a/src/main/extractor_ipc.h b/src/main/extractor_ipc.h @@ -370,6 +370,16 @@ EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm, /** + * Query datasource for current position + * + * @param ds data source to query + * @return current position in the datasource or UINT_MAX on error + */ +uint64_t +EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds); + + +/** * Create a channel to communicate with a process wrapping * the plugin of the given name. Starts the process as well. * diff --git a/src/main/extractor_ipc_gnu.c b/src/main/extractor_ipc_gnu.c @@ -241,6 +241,20 @@ EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm, size); } +/** + * Query datasource for current position + * + * @param ds data source to query + * @return current position in the datasource or UINT_MAX on error + */ +uint64_t +EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds) +{ + int64_t pos = EXTRACTOR_datasource_seek_ (ds, 0, SEEK_CUR); + if (-1 == pos) + return UINT_MAX; + return pos; +} /** * Create a channel to communicate with a process wrapping diff --git a/src/main/extractor_ipc_w32.c b/src/main/extractor_ipc_w32.c @@ -23,7 +23,9 @@ #include "extractor.h" #include "extractor_datasource.h" #include "extractor_plugin_main.h" +#include "extractor_plugins.h" #include "extractor_ipc.h" +#include "extractor_logging.h" /** */ @@ -33,7 +35,7 @@ struct EXTRACTOR_SharedMemory /** * W32 handle of the shm into which data is uncompressed */ - HANDLE shm; + HANDLE map; /** * Name of the shm @@ -43,12 +45,12 @@ struct EXTRACTOR_SharedMemory /** * Pointer to the mapped region of the shm (covers the whole shm) */ - void *shm_ptr; + void *ptr; /** * Position within shm */ - int64_t shm_pos; + int64_t pos; /** * Allocated size of the shm @@ -60,7 +62,12 @@ struct EXTRACTOR_SharedMemory */ size_t shm_buf_size; + size_t shm_map_size; + /** + * Reference counter describing how many references share this SHM. + */ + unsigned int rc; }; @@ -113,125 +120,135 @@ struct EXTRACTOR_Channel */ unsigned char *ov_write_buffer; + /** + * The plugin this channel is to communicate with. + */ + struct EXTRACTOR_PluginList *plugin; + + /** + * Memory segment shared with this process. + */ + struct EXTRACTOR_SharedMemory *shm; + + void *old_buf; + + /** + * Buffer for reading data from the plugin. + * FIXME: we might want to grow this + * buffer dynamically instead of always using 32 MB! + */ + char data[MAX_META_DATA]; + + /** + * Number of valid bytes in the channel's buffer. + */ + size_t size; }; /** - * Initializes an extracting session for a plugin. - * opens the file/shm (only in OPMODE_FILE) - * sets shm_ptr to NULL (unmaps it, if it was mapped) - * sets position to 0 - * initializes file size to 'fsize' (may be -1) - * sets seek request to 0 + * Create a shared memory area. * - * @param plugin plugin context - * @param operation_mode the mode of operation (OPMODE_*) - * @param fsize size of the source file (may be -1) - * @param shm_name name of the shm or file to open - * @return 0 on success, non-0 on error. - */ -static int -init_state_method (struct EXTRACTOR_PluginList *plugin, - uint8_t operation_mode, - int64_t fsize, - const char *shm_name) + * @param size size of the shared area + * @return NULL on error + */ +struct EXTRACTOR_SharedMemory * +EXTRACTOR_IPC_shared_memory_create_ (size_t size) { - plugin->seek_request = 0; - if (plugin->shm_ptr != NULL) - UnmapViewOfFile (plugin->shm_ptr); - plugin->shm_ptr = NULL; - if (INVALID_HANDLE_VALUE == plugin_open_shm (plugin, shm_name)) - return 1; - plugin->fsize = fsize; - plugin->shm_pos = 0; - plugin->fpos = 0; - return 0; + struct EXTRACTOR_SharedMemory *shm; + const char *tpath = "Local\\"; + + if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory)))) + return NULL; + + snprintf (shm->shm_name, MAX_SHM_NAME, + "%slibextractor-shm-%u-%u", + tpath, getpid(), + (unsigned int) RANDOM()); + shm->map = CreateFileMapping (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, size, shm->shm_name); + shm->ptr = MapViewOfFile (shm->map, FILE_MAP_WRITE, 0, 0, size); + if (shm->ptr == NULL) + { + CloseHandle (shm->map); + free (shm); + return NULL; + } + shm->shm_size = size; + shm->rc = 0; + return shm; } /** - * Opens a shared memory object (for later mmapping). - * This is W32 variant of the plugin_open_* function. - * Opened shm might be memory-backed or file-backed (depending on how - * it was created). shm_name is never a file name, unlike POSIX. - * Closes a shm is already opened, closes it before opening a new one. + * Change the reference counter for this shm instance. * - * @param plugin plugin context - * @param shm_name name of the shared memory object. - * @return memory-mapped file handle (NULL on error). That is, the result of OpenFileMapping() syscall. - */ -static HANDLE -plugin_open_shm (struct EXTRACTOR_PluginList *plugin, - const char *shm_name) + * @param shm instance to update + * @param delta value to change RC by + * @return new RC + */ +unsigned int +EXTRACTOR_IPC_shared_memory_change_rc_ (struct EXTRACTOR_SharedMemory *shm, + int delta) { - if (plugin->map_handle != 0) - CloseHandle (plugin->map_handle); - plugin->map_handle = OpenFileMapping (FILE_MAP_READ, FALSE, shm_name); - return plugin->map_handle; + shm->rc += delta; + return shm->rc; } /** - * Another name for plugin_open_shm(). - */ -static HANDLE -plugin_open_file (struct EXTRACTOR_PluginList *plugin, - const char *shm_name) -{ - return plugin_open_shm (plugin, shm_name); + * Destroy shared memory area. + * + * @param shm memory area to destroy + * @return NULL on error + */ +void +EXTRACTOR_IPC_shared_memory_destroy_ (struct EXTRACTOR_SharedMemory *shm) +{ + if (shm->ptr != NULL) + UnmapViewOfFile (shm->ptr); + if (shm->map != 0) + CloseHandle (shm->map); + free (shm); } - /** - * Initializes an extracting session for a plugin. - * opens the file/shm (only in OPMODE_FILE) - * sets shm_ptr to NULL (unmaps it, if it was mapped) - * sets position to 0 - * initializes file size to 'fsize' (may be -1) - * sets seek request to 0 + * Initialize shared memory area from data source. * - * @param plugin plugin context - * @param operation_mode the mode of operation (OPMODE_*) - * @param fsize size of the source file (may be -1) - * @param shm_name name of the shm or file to open - * @return 0 on success, non-0 on error. - */ -static int -init_state_method (struct EXTRACTOR_PluginList *plugin, - uint8_t operation_mode, - int64_t fsize, - const char *shm_name) + * @param shm memory area to initialize + * @param ds data source to use for initialization + * @param off offset to use in data source + * @param size number of bytes to copy + * @return -1 on error, otherwise number of bytes copied + */ +ssize_t +EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm, + struct EXTRACTOR_Datasource *ds, + uint64_t off, + size_t size) { - plugin->seek_request = 0; - if (plugin->shm_ptr != NULL) - UnmapViewOfFile (plugin->shm_ptr); - plugin->shm_ptr = NULL; - if (INVALID_HANDLE_VALUE == plugin_open_shm (plugin, shm_name)) - return 1; - plugin->fsize = fsize; - plugin->shm_pos = 0; - plugin->fpos = 0; - return 0; + if (-1 == + EXTRACTOR_datasource_seek_ (ds, off, SEEK_SET)) + return -1; + if (size > shm->shm_size) + size = shm->shm_size; + return EXTRACTOR_datasource_read_ (ds, + shm->ptr, + size); } /** - * Deinitializes an extracting session for a plugin. - * unmaps shm_ptr (if was mapped) - * closes file/shm (if it was opened) - * sets map size and shm_ptr to NULL. + * Query datasource for current position * - * @param plugin plugin context - */ -static void -discard_state_method (struct EXTRACTOR_PluginList *plugin) + * @param ds data source to query + * @return current position in the datasource or UINT_MAX on error + */ +uint64_t +EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds) { - if (plugin->shm_ptr != NULL) - UnmapViewOfFile (plugin->shm_ptr); - if (plugin->map_handle != 0) - CloseHandle (plugin->map_handle); - plugin->map_handle = 0; - plugin->map_size = 0; - plugin->shm_ptr = NULL; + int64_t pos = EXTRACTOR_datasource_seek_ (ds, 0, SEEK_CUR); + if (-1 == pos) + return UINT_MAX; + return pos; } @@ -341,80 +358,6 @@ create_selectable_pipe (PHANDLE read_pipe_ptr, PHANDLE write_pipe_ptr, return 0; } - -/** - * Writes @size bytes from @buf to @h, using @ov for - * overlapped i/o. Deallocates @old_buf and sets it to NULL, - * if necessary. - * Writes asynchronously, but sequentially (only one writing - * operation may be active at any given moment, but it will - * be done in background). Thus it is intended to be used - * for writing a few big chunks rather than a lot of small pieces. - * - * The extravagant interface is mainly because this function - * does not use a separate struct to group together overlapped - * structure, buffer pointer and the handle. - * - * @param h pipe handle - * @param ov overlapped structure pointer - * @param buf buffer to read from. Will be copied internally - * @param size number of bytes to write - * @param old_buf pointer where a copy of previous buffer is stored, - * and where a copy of @buf will be stored. - * - * @return number of bytes written, -1 on error - */ -static int -write_to_pipe (HANDLE h, - OVERLAPPED *ov, - unsigned char *buf, size_t size, - unsigned char **old_buf) -{ - DWORD written; - BOOL bresult; - DWORD err; - - if (WAIT_OBJECT_0 != WaitForSingleObject (ov->hEvent, INFINITE)) - return -1; - - ResetEvent (ov->hEvent); - - if (*old_buf != NULL) - free (*old_buf); - - *old_buf = malloc (size); - if (*old_buf == NULL) - return -1; - memcpy (*old_buf, buf, size); - written = 0; - ov->Offset = 0; - ov->OffsetHigh = 0; - ov->Pointer = 0; - ov->Internal = 0; - ov->InternalHigh = 0; - bresult = WriteFile (h, *old_buf, size, &written, ov); - - if (bresult == TRUE) - { - SetEvent (ov->hEvent); - free (*old_buf); - *old_buf = NULL; - return written; - } - - err = GetLastError (); - if (err == ERROR_IO_PENDING) - return size; - SetEvent (ov->hEvent); - *old_buf = NULL; - SetLastError (err); - return -1; -} - - -#define plugin_write(plug, buf, size) write_to_pipe (plug->cpipe_in, &plug->ov_write, buf, size, &plug->ov_write_buffer) - - /** * Communicates plugin data (library name, options) to the plugin * process. This is only necessary on W32, where this information @@ -425,27 +368,15 @@ write_to_pipe (HANDLE h, * @return 0 on success, -1 on failure */ static int -write_plugin_data (struct EXTRACTOR_PluginList *plugin) +write_plugin_data (struct EXTRACTOR_PluginList *plugin, + struct EXTRACTOR_Channel *channel) { size_t libname_len, shortname_len, opts_len; DWORD len; char *str; size_t total_len = 0; unsigned char *buf, *ptr; - - switch (plugin->flags) - { - case EXTRACTOR_OPTION_DEFAULT_POLICY: - break; - case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: - break; - case EXTRACTOR_OPTION_IN_PROCESS: - return 0; - break; - case EXTRACTOR_OPTION_DISABLED: - return 0; - break; - } + ssize_t write_result; libname_len = strlen (plugin->libname) + 1; total_len += sizeof (size_t) + libname_len; @@ -481,24 +412,29 @@ write_plugin_data (struct EXTRACTOR_PluginList *plugin) memcpy (ptr, plugin->plugin_options, opts_len); ptr += opts_len; } - if (total_len != write_to_pipe (plugin->cpipe_in, &plugin->ov_write, buf, total_len, &plugin->ov_write_buffer)) - { - free (buf); - return -1; - } + write_result = EXTRACTOR_IPC_channel_send_ (channel, buf, total_len); free (buf); - return 0; + return total_len == write_result; } - /** - * Start the process for the given plugin. + * Create a channel to communicate with a process wrapping + * the plugin of the given name. Starts the process as well. + * + * @param plugin the plugin + * @param shm memory to share with the process + * @return NULL on error, otherwise IPC channel */ -static void -start_process (struct EXTRACTOR_PluginList *plugin) +struct EXTRACTOR_Channel * +EXTRACTOR_IPC_channel_create_ (struct EXTRACTOR_PluginList *plugin, + struct EXTRACTOR_SharedMemory *shm) { + struct EXTRACTOR_Channel *channel; HANDLE p1[2]; HANDLE p2[2]; + struct InitMessage *init; + size_t slen; + STARTUPINFO startup; PROCESS_INFORMATION proc; char cmd[MAX_PATH + 1]; @@ -507,50 +443,40 @@ start_process (struct EXTRACTOR_PluginList *plugin) HANDLE p21_os_inh = INVALID_HANDLE_VALUE; SECURITY_ATTRIBUTES sa; - switch (plugin->flags) - { - case EXTRACTOR_OPTION_DEFAULT_POLICY: - if (plugin->hProcess != INVALID_HANDLE_VALUE && plugin->hProcess != 0) - return; - break; - case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: - if (plugin->hProcess != 0) - return; - break; - case EXTRACTOR_OPTION_IN_PROCESS: - return; - break; - case EXTRACTOR_OPTION_DISABLED: - return; - break; - } + if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel)))) + { + LOG_STRERROR ("malloc"); + return NULL; + } + channel->shm = shm; + channel->plugin = plugin; + channel->size = 0; sa.nLength = sizeof (sa); sa.lpSecurityDescriptor = NULL; sa.bInheritHandle = FALSE; - plugin->hProcess = NULL; - if (0 != create_selectable_pipe (&p1[0], &p1[1], &sa, 1024, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED)) { - plugin->flags = EXTRACTOR_OPTION_DISABLED; - return; + LOG_STRERROR ("pipe"); + free (channel); + return NULL; } if (0 != create_selectable_pipe (&p2[0], &p2[1], &sa, 1024, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED)) { + LOG_STRERROR ("pipe"); CloseHandle (p1[0]); CloseHandle (p1[1]); - plugin->flags = EXTRACTOR_OPTION_DISABLED; - return; + free (channel); + return NULL; } - memset (&startup, 0, sizeof (STARTUPINFO)); - if (!DuplicateHandle (GetCurrentProcess (), p1[0], GetCurrentProcess (), &p10_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS) || !DuplicateHandle (GetCurrentProcess (), p2[1], GetCurrentProcess (), &p21_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS)) { + LOG_STRERROR ("DuplicateHandle"); if (p10_os_inh != INVALID_HANDLE_VALUE) CloseHandle (p10_os_inh); if (p21_os_inh != INVALID_HANDLE_VALUE) @@ -559,10 +485,14 @@ start_process (struct EXTRACTOR_PluginList *plugin) CloseHandle (p1[1]); CloseHandle (p2[0]); CloseHandle (p2[1]); - plugin->flags = EXTRACTOR_OPTION_DISABLED; - return; + CloseHandle (p1[0]); + CloseHandle (p1[1]); + free (channel); + return NULL; } + memset (&startup, 0, sizeof (STARTUPINFO)); + /* TODO: write our own plugin-hosting executable? rundll32, for once, has smaller than usual stack size. * Also, users might freak out seeing over 9000 rundll32 processes (seeing over 9000 processes named * "libextractor_plugin_helper" is probably less confusing). @@ -571,323 +501,250 @@ start_process (struct EXTRACTOR_PluginList *plugin) "rundll32.exe libextractor-3.dll,RundllEntryPoint@16 %lu %lu", p10_os_inh, p21_os_inh); cmd[MAX_PATH] = '\0'; - if (CreateProcessA (NULL, cmd, NULL, NULL, TRUE, 0, NULL, NULL, + if (CreateProcessA (NULL, cmd, NULL, NULL, TRUE, CREATE_SUSPENDED, NULL, NULL, &startup, &proc)) { - plugin->hProcess = proc.hProcess; + channel->hProcess = proc.hProcess; + ResumeThread (proc.hThread); CloseHandle (proc.hThread); } else { + LOG_STRERROR ("CreateProcess"); CloseHandle (p1[0]); CloseHandle (p1[1]); CloseHandle (p2[0]); CloseHandle (p2[1]); - plugin->flags = EXTRACTOR_OPTION_DISABLED; - return; + free (channel); + return NULL; } CloseHandle (p1[0]); CloseHandle (p2[1]); CloseHandle (p10_os_inh); CloseHandle (p21_os_inh); - plugin->cpipe_in = p1[1]; - plugin->cpipe_out = p2[0]; - - memset (&plugin->ov_read, 0, sizeof (OVERLAPPED)); - memset (&plugin->ov_write, 0, sizeof (OVERLAPPED)); + channel->cpipe_in = p1[1]; + channel->cpipe_out = p2[0]; - plugin->ov_write_buffer = NULL; - - plugin->ov_write.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL); - plugin->ov_read.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL); -} + memset (&channel->ov_read, 0, sizeof (OVERLAPPED)); + memset (&channel->ov_write, 0, sizeof (OVERLAPPED)); + channel->ov_write_buffer = NULL; -/** - * Receive 'size' bytes from channel, store them in 'buf' - * - * @param plugin plugin context - * @param buf buffer to fill - * @param size number of bytes to read - * @return number of bytes read, 0 on EOS, < 0 on error - */ -static int -plugin_read (struct EXTRACTOR_PluginList *plugin, - void *buf, - size_t size) -{ - char *rb = buf; - ssize_t read_result; - size_t read_count = 0; - - while (read_count < size) - { - read_result = read (plugin->cpipe_out, - &rb[read_count], size - read_count); - if (read_result <= 0) - return read_result; - read_count += read_result; - } - return read_count; -} - - -/** - * Stop the child process of this plugin. - */ -static void -stop_process (struct EXTRACTOR_PluginList *plugin) -{ - int status; - HANDLE process; + channel->ov_write.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL); + channel->ov_read.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL); -#if DEBUG - if (plugin->hProcess == INVALID_HANDLE_VALUE) - fprintf (stderr, - "Plugin `%s' choked on this input\n", - plugin->short_libname); -#endif - if (plugin->hProcess == INVALID_HANDLE_VALUE || - plugin->hProcess == NULL) - return; - TerminateProcess (plugin->hProcess, 0); - CloseHandle (plugin->hProcess); - plugin->hProcess = INVALID_HANDLE_VALUE; - CloseHandle (plugin->cpipe_out); - CloseHandle (plugin->cpipe_in); - plugin->cpipe_out = INVALID_HANDLE_VALUE; - plugin->cpipe_in = INVALID_HANDLE_VALUE; - CloseHandle (plugin->ov_read.hEvent); - CloseHandle (plugin->ov_write.hEvent); - if (plugin->ov_write_buffer != NULL) + if (!write_plugin_data (plugin, channel)) { - free (plugin->ov_write_buffer); - plugin->ov_write_buffer = NULL; + LOG_STRERROR ("write_plugin_data"); + EXTRACTOR_IPC_channel_destroy_ (channel); + return NULL; } - if (plugin->flags != EXTRACTOR_OPTION_DEFAULT_POLICY) - plugin->flags = EXTRACTOR_OPTION_DISABLED; - - plugin->seek_request = -1; + slen = strlen (shm->shm_name) + 1; + if (NULL == (init = malloc (sizeof (struct InitMessage) + slen))) + { + LOG_STRERROR ("malloc"); + EXTRACTOR_IPC_channel_destroy_ (channel); + return NULL; + } + init->opcode = MESSAGE_INIT_STATE; + init->reserved = 0; + init->reserved2 = 0; + init->shm_name_length = slen; + init->shm_map_size = shm->shm_size; + memcpy (&init[1], shm->shm_name, slen); + if (sizeof (struct InitMessage) + slen != + EXTRACTOR_IPC_channel_send_ (channel, init, + sizeof (struct InitMessage) + slen)) + { + LOG ("Failed to send INIT_STATE message to plugin\n"); + EXTRACTOR_IPC_channel_destroy_ (channel); + return NULL; + } + return channel; } - - /** - * Setup a shared memory segment. + * Destroy communication channel with a plugin/process. Also + * destroys the process. * - * @param ptr set to the location of the map segment - * @param map where to store the map handle - * @param fn name of the mapping - * @param fn_size size available in fn - * @param size number of bytes to allocated for the mapping - * @return 0 on success + * @param channel channel to communicate with the plugin */ -static int -make_shm_w32 (void **ptr, - HANDLE *map, - char *fn, - size_t fn_size, size_t size) +void +EXTRACTOR_IPC_channel_destroy_ (struct EXTRACTOR_Channel *channel) { - const char *tpath = "Local\\"; + int status; - snprintf (fn, fn_size, - "%slibextractor-shm-%u-%u", - tpath, getpid(), - (unsigned int) RANDOM()); - *map = CreateFileMapping (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, size, fn); - *ptr = MapViewOfFile (*map, FILE_MAP_WRITE, 0, 0, size); - if (*ptr == NULL) + TerminateProcess (channel->hProcess, 0); + CloseHandle (channel->hProcess); + CloseHandle (channel->cpipe_out); + CloseHandle (channel->cpipe_in); + CloseHandle (channel->ov_read.hEvent); + CloseHandle (channel->ov_write.hEvent); + if (channel->ov_write_buffer != NULL) { - CloseHandle (*map); - return 1; + free (channel->ov_write_buffer); + channel->ov_write_buffer = NULL; } - return 0; + free (channel); } /** - * Setup a file-backed shared memory segment. + * Send data via the given IPC channel (blocking). * - * @param map where to store the map handle - * @param file handle of the file to back the shm - * @param fn name of the mapping - * @param fn_size size available in fn - * @param size number of bytes to allocated for the mapping - * @return 0 on success + * @param channel channel to communicate with the plugin + * @param buf data to send + * @param size number of bytes in buf to send + * @return -1 on error, number of bytes sent on success + * (never does partial writes) */ -static int -make_file_backed_shm_w32 (HANDLE *map, HANDLE file, char *fn, size_t fn_size) -{ - const char *tpath = "Local\\"; - snprintf (fn, fn_size, "%slibextractor-shm-%u-%u", tpath, getpid(), - (unsigned int) RANDOM()); - *map = CreateFileMapping (file, NULL, PAGE_READONLY, 0, 0, fn); - if (*map == NULL) - { - DWORD err = GetLastError (); - return 1; - } - return 0; -} - - -static void -destroy_shm_w32 (void *ptr, HANDLE map) -{ - UnmapViewOfFile (ptr); - CloseHandle (map); -} - - -static void -destroy_file_backed_shm_w32 (HANDLE map) +ssize_t +EXTRACTOR_IPC_channel_send_ (struct EXTRACTOR_Channel *channel, + const void *data, + size_t size) { - CloseHandle (map); -} + DWORD written; + DWORD err; + BOOL bresult; + const char *cdata = data; + if (WAIT_OBJECT_0 != WaitForSingleObject (channel->ov_write.hEvent, INFINITE)) + return -1; + ResetEvent (channel->ov_write.hEvent); -#define plugin_write(plug, buf, size) write_all (fileno (plug->cpipe_in), buf, size) + if (channel->old_buf != NULL) + free (channel->old_buf); + channel->old_buf = malloc (size); + if (channel->old_buf == NULL) + return -1; -/** - * Receive 'size' bytes from plugin, store them in 'buf' - * - * @param plugin plugin context - * @param buf buffer to fill - * @param size number of bytes to read - * @return number of bytes read, 0 on EOS, < 0 on error - */ -static int -plugin_read (struct EXTRACTOR_PluginList *plugin, - void *buf, size_t size) -{ - char *rb = buf; - DWORD bytes_read; - size_t read_count = 0; + memcpy (channel->old_buf, data, size); + written = 0; + channel->ov_write.Offset = 0; + channel->ov_write.OffsetHigh = 0; + channel->ov_write.Pointer = 0; + channel->ov_write.Internal = 0; + channel->ov_write.InternalHigh = 0; + bresult = WriteFile (channel->cpipe_in, channel->old_buf, size, &written, &channel->ov_write); - while (read_count < size) + if (bresult == TRUE) { - if (! ReadFile (plugin->cpipe_out, - &rb[read_count], size - read_count, - &bytes_read, NULL)) - return -1; - read_count += bytes_read; + SetEvent (channel->ov_write.hEvent); + free (channel->old_buf); + channel->old_buf = NULL; + return written; } - return read_count; + + err = GetLastError (); + if (err == ERROR_IO_PENDING) + return size; + SetEvent (channel->ov_write.hEvent); + free (channel->old_buf); + channel->old_buf = NULL; + SetLastError (err); + return -1; } /** + * Receive data from any of the given IPC channels (blocking). * Wait for one of the plugins to reply. - * Selects on plugin output pipes, runs receive_reply() + * Selects on plugin output pipes, runs 'receive_reply' * on each activated pipe until it gets a seek request * or a done message. Called repeatedly by the user until all pipes are dry or * broken. - * This W32 version of wait_for_reply() can't select on more than 64 plugins - * at once (returns -1 if there are more than 64 plugins). * - * @param plugins to select upon - * @param proc metadata callback - * @param proc_cls callback cls - * @return number of dry/broken pipes since last call, -1 on error or if no - * plugins reply in 10 seconds. + * @param channels array of channels, channels that break may be set to NULL + * @param num_channels length of the 'channels' array + * @param proc function to call to process messages (may be called + * more than once) + * @param proc_cls closure for 'proc' + * @return -1 on error, 1 on success */ -static int -wait_for_reply (struct EXTRACTOR_PluginList *plugins, - EXTRACTOR_MetaDataProcessor proc, void *proc_cls) +int +EXTRACTOR_IPC_channel_recv_ (struct EXTRACTOR_Channel **channels, + unsigned int num_channels, + EXTRACTOR_ChannelMessageProcessor proc, + void *proc_cls) { - int result; DWORD ms; DWORD first_ready; DWORD dwresult; DWORD bytes_read; BOOL bresult; unsigned int i; + unsigned int c; HANDLE events[MAXIMUM_WAIT_OBJECTS]; - struct EXTRACTOR_PluginList *ppos; - i = 0; - for (ppos = plugins; NULL != ppos; ppos = ppos->next) + c = 0; + for (i = 0; i < num_channels; i++) { - if (i == MAXIMUM_WAIT_OBJECTS) - return -1; - if (ppos->seek_request == -1) + if (NULL == channels[i]) continue; - switch (ppos->flags) + if (MAXIMUM_WAIT_OBJECTS == c) + return -1; + if (WaitForSingleObject (channels[i]->ov_read.hEvent, 0) == WAIT_OBJECT_0) { - case EXTRACTOR_OPTION_DEFAULT_POLICY: - case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: - if (WaitForSingleObject (ppos->ov_read.hEvent, 0) == WAIT_OBJECT_0) + ResetEvent (channels[i]->ov_read.hEvent); + bresult = ReadFile (channels[i]->cpipe_out, &i, 0, &bytes_read, &channels[i]->ov_read); + if (bresult == TRUE) { - ResetEvent (ppos->ov_read.hEvent); - bresult = ReadFile (ppos->cpipe_out, &i, 0, &bytes_read, &ppos->ov_read); - if (bresult == TRUE) - { - SetEvent (ppos->ov_read.hEvent); - } - else - { - DWORD err = GetLastError (); - if (err != ERROR_IO_PENDING) - SetEvent (ppos->ov_read.hEvent); - } + SetEvent (channels[i]->ov_read.hEvent); + } + else + { + DWORD err = GetLastError (); + if (err != ERROR_IO_PENDING) + SetEvent (channels[i]->ov_read.hEvent); } - events[i] = ppos->ov_read.hEvent; - i++; - break; - case EXTRACTOR_OPTION_IN_PROCESS: - break; - case EXTRACTOR_OPTION_DISABLED: - break; } + events[c] = channels[i]->ov_read.hEvent; + c++; } ms = 10000; - first_ready = WaitForMultipleObjects (i, events, FALSE, ms); + first_ready = WaitForMultipleObjects (c, events, FALSE, ms); if (first_ready == WAIT_TIMEOUT || first_ready == WAIT_FAILED) + { /* an error or timeout -> something's wrong or all plugins hung up */ + LOG_STRERROR ("WaitForMultipleObjects"); return -1; + } i = 0; - result = 0; - for (ppos = plugins; NULL != ppos; ppos = ppos->next) + for (i = 0; i < num_channels; i++) { - int read_result; - switch (ppos->flags) + if (NULL == channels[i]) + continue; + dwresult = WaitForSingleObject (channels[i]->ov_read.hEvent, 0); + if (dwresult == WAIT_OBJECT_0) { - case EXTRACTOR_OPTION_DEFAULT_POLICY: - case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: - if (ppos->seek_request == -1) - continue; - if (i < first_ready) + int ret; + bresult = ReadFile (channels[i]->cpipe_out, + &channels[i]->data[channels[i]->size], + MAX_META_DATA - channels[i]->size, &bytes_read, NULL); + if (bresult) + ret = EXTRACTOR_IPC_process_reply_ (channels[i]->plugin, + channels[i]->data, channels[i]->size + bytes_read, proc, proc_cls); + if (!bresult || -1 == ret) { - i += 1; - continue; + if (!bresult) + LOG_STRERROR ("ReadFile"); + EXTRACTOR_IPC_channel_destroy_ (channels[i]); + channels[i] = NULL; } - dwresult = WaitForSingleObject (ppos->ov_read.hEvent, 0); - read_result = 0; - if (dwresult == WAIT_OBJECT_0) + else { - read_result = receive_reply (ppos, proc, proc_cls); - result += 1; + memmove (channels[i]->data, &channels[i]->data[ret], + channels[i]->size + bytes_read - ret); + channels[i]->size = channels[i]->size + bytes_read- ret; } - if (dwresult == WAIT_FAILED || read_result < 0) - { - stop_process (ppos); - if (dwresult == WAIT_FAILED) - result += 1; - } - i++; - break; - case EXTRACTOR_OPTION_IN_PROCESS: - break; - case EXTRACTOR_OPTION_DISABLED: - break; } } - return result; + return 1; } diff --git a/src/main/extractor_plugin_main.c b/src/main/extractor_plugin_main.c @@ -126,9 +126,8 @@ plugin_env_seek (void *cls, LOG ("Invalid seek operation\n"); return -1; } - if ( (pos > 0) && - ( (pc->read_position + pos < pc->read_position) || - (pc->read_position + pos > pc->file_size) ) ) + if ((pos > 0) && ((pc->read_position + pos < pc->read_position) || + (pc->read_position + pos > pc->file_size))) { LOG ("Invalid seek operation\n"); return -1; @@ -181,24 +180,26 @@ plugin_env_seek (void *cls, LOG ("Failed to read response to MESSAGE_SEEK\n"); return -1; } - if (MESSAGE_SEEK != reply) + if (MESSAGE_UPDATED_SHM != reply) return -1; /* was likely a MESSAGE_DISCARD_STATE */ if (-1 == EXTRACTOR_read_all_ (pc->in, &um.reserved, sizeof (um) - 1)) { - LOG ("Failed to read UPDATE_MESSAGE\n"); + LOG ("Failed to read MESSAGE_UPDATED_SHM\n"); return -1; } pc->shm_off = um.shm_off; pc->shm_ready_bytes = um.shm_ready_bytes; + pc->file_size = um.file_size; if ( (pc->shm_off <= npos) && - (pc->shm_off + pc->shm_ready_bytes > npos) ) + ((pc->shm_off + pc->shm_ready_bytes > npos) || + (pc->file_size == pc->shm_off)) ) { pc->read_position = npos; return (int64_t) npos; } /* oops, serious missunderstanding, we asked to seek and then were notified about a different position!? */ - LOG ("Got invalid UPDATE_MESSAGE in response to my seek\n"); + LOG ("Got invalid MESSAGE_UPDATED_SHM in response to my seek\n"); return -1; } @@ -222,9 +223,10 @@ plugin_env_read (void *cls, if ( (count + pc->read_position > pc->file_size) || (count + pc->read_position < pc->read_position) ) count = pc->file_size - pc->read_position; - if ( ( (pc->read_position >= pc->shm_off + pc->shm_ready_bytes) || - (pc->read_position < pc->shm_off) ) && - (-1 == plugin_env_seek (pc, pc->read_position, SEEK_SET)) ) + if ((((pc->read_position >= pc->shm_off + pc->shm_ready_bytes) && + (pc->read_position < pc->file_size)) || + (pc->read_position < pc->shm_off)) && + (-1 == plugin_env_seek (pc, pc->read_position, SEEK_SET))) { LOG ("Failed to seek to satisfy read\n"); return -1; @@ -292,6 +294,7 @@ plugin_env_send_proc (void *cls, mime_len = UINT16_MAX; mm.opcode = MESSAGE_META; mm.reserved = 0; + mm.meta_type = type; mm.meta_format = (uint16_t) format; mm.mime_length = (uint16_t) mime_len; mm.value_size = (uint32_t) data_len; @@ -579,6 +582,8 @@ EXTRACTOR_plugin_main_ (struct EXTRACTOR_PluginList *plugin, #if WINDOWS if (NULL != pc.shm) UnmapViewOfFile (pc.shm); + if (NULL != pc.shm_id) + CloseHandle (pc.shm_id); #else if ( (NULL != pc.shm) && (((void*) 1) != pc.shm) ) diff --git a/src/main/test_extractor.c b/src/main/test_extractor.c @@ -20,6 +20,11 @@ /** * @file main/test_extractor.c * @brief plugin for testing GNU libextractor + * Data file (or buffer) for this test must be 150 * 1024 bytes long, + * first 4 bytes must be "test", all other bytes should be equal to + * <FILE_OFFSET> % 256. The test client must return 0 after seeing + * "Hello World!" metadata, and return 1 after seeing "Goodbye!" + * metadata. * @author Christian Grothoff */ #include "platform.h" @@ -44,93 +49,97 @@ EXTRACTOR_test_extract_method (struct EXTRACTOR_ExtractContext *ec) { unsigned char *dp; - if ( (NULL == ec->config) || - (0 != strcmp (ec->config, "test")) ) + if ((NULL == ec->config) || (0 != strcmp (ec->config, "test"))) return; /* only run in test mode */ - if ( (4 != ec->read (ec->cls, - &dp, - 4)) || - (0 != strncmp ("test", - (const char*) dp, - 4) ) ) + if (4 != ec->read (ec->cls, &dp, 4)) { fprintf (stderr, "Reading at offset 0 failed\n"); abort (); } - if (1024 * 150 != - ec->get_size (ec->cls)) + if (0 != strncmp ("test", (const char*) dp, 4)) + { + fprintf (stderr, "Unexpected data at offset 0\n"); + abort (); + } + if (1024 * 150 != ec->get_size (ec->cls)) { fprintf (stderr, "Unexpected file size returned (expected 150k)\n"); abort (); } - if (1024 * 100 != - ec->seek (ec->cls, - 1024 * 100 + 4, - SEEK_SET)) + + if (1024 * 100 + 4 != ec->seek (ec->cls, 1024 * 100 + 4, SEEK_SET)) { fprintf (stderr, "Failure to seek (SEEK_SET)\n"); abort (); } - if ( (1 != ec->read (ec->cls, - &dp, - 1)) || - ((1024 * 100 + 4) % 256 != *dp) ) + if (1 != ec->read (ec->cls, &dp, 1)) { fprintf (stderr, "Failure to read at 100k + 4\n"); abort (); } - if (1024 * 50 - 3 != - ec->seek (ec->cls, - - (1024 * 50 + 7), - SEEK_CUR)) + if ((1024 * 100 + 4) % 256 != *dp) + { + fprintf (stderr, "Unexpected data at offset 100k + 4\n"); + abort (); + } + + if (((1024 * 100 + 4) + 1 - (1024 * 50 + 7)) != + ec->seek (ec->cls, - (1024 * 50 + 7), SEEK_CUR)) { fprintf (stderr, "Failure to seek (SEEK_SET)\n"); abort (); } - if ( (1 != ec->read (ec->cls, - &dp, - 1)) || - ((1024 * 50 - 3) % 256 != *dp) ) + if (1 != ec->read (ec->cls, &dp, 1)) { fprintf (stderr, "Failure to read at 50k - 3\n"); abort (); } - if (1024 * 150 - 3 != - ec->seek (ec->cls, - - 2, - SEEK_END)) + if (((1024 * 100 + 4) + 1 - (1024 * 50 + 7)) % 256 != *dp) { - fprintf (stderr, "Failure to seek (SEEK_SET)\n"); + fprintf (stderr, "Unexpected data at offset 50k - 3\n"); + abort (); + } + + if (1024 * 150 != ec->seek (ec->cls, 0, SEEK_END)) + { + fprintf (stderr, "Failure to seek (SEEK_END)\n"); + abort (); + } + if (0 != ec->read (ec->cls, &dp, 1)) + { + fprintf (stderr, "Failed to receive EOF at 150k\n"); + abort (); + } + + if (1024 * 150 - 2 != ec->seek (ec->cls, -2, SEEK_END)) + { + fprintf (stderr, "Failure to seek (SEEK_END - 2)\n"); + abort (); + } + if (1 != ec->read (ec->cls, &dp, 1)) + { + fprintf (stderr, "Failure to read at 150k - 3\n"); abort (); } - if ( (1 != ec->read (ec->cls, - &dp, - 1)) || - ((1024 * 150 - 2) % 256 != *dp) ) + if ((1024 * 150 - 2) % 256 != *dp) { - fprintf (stderr, "Failure to read at 150k - 2\n"); + fprintf (stderr, "Unexpected data at offset 150k - 3\n"); abort (); } - if (0 != - ec->proc (ec->cls, - "test", - EXTRACTOR_METATYPE_COMMENT, - EXTRACTOR_METAFORMAT_UTF8, - "<no mime>", - "Hello world!", - strlen ("Hello world!") + 1)) + + if (0 != ec->proc (ec->cls, "test", EXTRACTOR_METATYPE_COMMENT, + EXTRACTOR_METAFORMAT_UTF8, "<no mime>", "Hello world!", + strlen ("Hello world!") + 1)) { fprintf (stderr, "Unexpected return value from 'proc'\n"); abort (); } - if (1 != - ec->proc (ec->cls, - "test", - EXTRACTOR_METATYPE_COMMENT, - EXTRACTOR_METAFORMAT_UTF8, - "<no mime>", - "Goodbye!", - strlen ("Goodbye!") + 1)) + /* The test assumes that client orders us to stop extraction + * after seeing "Goodbye!". + */ + if (1 != ec->proc (ec->cls, "test", EXTRACTOR_METATYPE_COMMENT, + EXTRACTOR_METAFORMAT_UTF8, "<no mime>", "Goodbye!", + strlen ("Goodbye!") + 1)) { fprintf (stderr, "Unexpected return value from 'proc'\n"); abort (); diff --git a/src/main/test_ipc.c b/src/main/test_ipc.c @@ -136,7 +136,7 @@ main (int argc, char *argv[]) /* change environment to find 'extractor_test' plugin which is not installed but should be in the current directory (or .libs) on 'make check' */ - if (0 != setenv ("LIBEXTRACTOR_PREFIX", ".:.libs/", 1)) + if (0 != putenv ("LIBEXTRACTOR_PREFIX=." PATH_SEPARATOR_STR ".libs/")) fprintf (stderr, "Failed to update my environment, plugin loading may fail: %s\n", strerror (errno)); diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am @@ -15,14 +15,14 @@ if HAVE_VORBISFILE PLUGIN_OGG=libextractor_ogg.la endif -plugin_LTLIBRARIES = \ - libextractor_id3.la \ - libextractor_id3v2.la \ - libextractor_ebml.la \ - libextractor_s3m.la \ - $(PLUGIN_OGG) \ - libextractor_png.la \ - libextractor_mp3.la +#plugin_LTLIBRARIES = \ +# libextractor_id3.la \ +# libextractor_id3v2.la \ +# libextractor_ebml.la \ +# libextractor_s3m.la \ +# $(PLUGIN_OGG) \ +# libextractor_png.la \ +# libextractor_mp3.la libextractor_mp3_la_SOURCES = \ mp3_extractor.c