From e7292e8db0955dc14a3b3e67ae709a186ea68136 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 24 Jul 2012 15:30:16 +0000 Subject: -hxing --- src/include/extractor.h | 8 +- src/main/extractor_plugin_main.c | 414 ++++++++++++++++++++------------------- 2 files changed, 215 insertions(+), 207 deletions(-) diff --git a/src/include/extractor.h b/src/include/extractor.h index e862813..a227ea5 100644 --- a/src/include/extractor.h +++ b/src/include/extractor.h @@ -418,12 +418,12 @@ struct EXTRACTOR_ExtractContext * @param cls the 'cls' member of this struct * @param pos position to seek (see 'man lseek') * @param whence how to see (absolute to start, relative, absolute to end) - * @return new absolute position, UINT64_MAX on error (i.e. desired position + * @return new absolute position, -1 on error (i.e. desired position * does not exist) */ - uint64_t (*seek) (void *cls, - int64_t pos, - int whence); + int64_t (*seek) (void *cls, + int64_t pos, + int whence); /** diff --git a/src/main/extractor_plugin_main.c b/src/main/extractor_plugin_main.c index 9cf7624..0f5b79f 100644 --- a/src/main/extractor_plugin_main.c +++ b/src/main/extractor_plugin_main.c @@ -27,6 +27,7 @@ #include "plibc.h" #include "extractor.h" #include "extractor_datasource.h" +#include "extractor_plugins.h" #include "extractor_ipc.h" #include "extractor_plugin_main.h" #include @@ -36,26 +37,62 @@ #include - /** - * Opens a file (for later mmapping). - * This is POSIX variant of the plugin_open_* function. - * Closes a file is already opened, closes it before opening a new one. - * Destroy shared memory area. - * - * @param plugin plugin context - * @param shm_name name of the file to open. - * @return file id (-1 on error). That is, the result of open() syscall. - */ -static int -plugin_open_file (struct EXTRACTOR_PluginList *plugin, - const char *shm_name) + * Closure we use for processing requests inside the helper process. + */ +struct ProcessingContext { - if (plugin->shm_id != -1) - close (plugin->shm_id); - plugin->shm_id = open (shm_name, O_RDONLY, 0); - return plugin->shm_id; -} + /** + * Our plugin handle. + */ + struct EXTRACTOR_PluginList *plugin; + + /** + * Shared memory area. + */ + void *shm; + + /** + * Overall size of the file. + */ + uint64_t file_size; + + /** + * Current read offset when reading from the SHM. + */ + uint64_t read_position; + + /** + * Current offset of the SHM in the file. + */ + uint64_t shm_off; + + /** + * Handle to the shared memory. + */ + int shm_id; + + /** + * Size of the shared memory map. + */ + uint32_t shm_map_size; + + /** + * Number of bytes ready in SHM. + */ + uint32_t shm_ready_bytes; + + /** + * Input stream. + */ + int in; + + /** + * Output stream. + */ + int out; +}; + /** @@ -69,8 +106,12 @@ plugin_open_file (struct EXTRACTOR_PluginList *plugin, * @return new absolute position, -1 on error */ static int64_t -pl_seek (struct EXTRACTOR_PluginList *plugin, int64_t pos, int whence) +plugin_env_seek (void *cls, + int64_t pos, + int whence) { + struct ProcessingContext *pc = cls; + switch (whence) { case SEEK_CUR: @@ -123,13 +164,6 @@ pl_seek (struct EXTRACTOR_PluginList *plugin, int64_t pos, int whence) } -static int64_t -pl_get_fsize (struct EXTRACTOR_PluginList *plugin) -{ - return plugin->fsize; -} - - /** * Fills @data with a pointer to the data buffer. * Equivalent to read(), except you don't have to allocate and free @@ -141,9 +175,12 @@ pl_get_fsize (struct EXTRACTOR_PluginList *plugin) * @param count number of bytes to read * @return number of bytes (<= count) avalable in @data, -1 on error */ -static int64_t -pl_read (struct EXTRACTOR_PluginList *plugin, unsigned char **data, size_t count) +static ssize_t +plugin_env_read (void *cls, + unsigned char **data, size_t count) { + struct ProcessingContext *pc = cls; + *data = NULL; if (count > MAX_READ) return -1; @@ -166,41 +203,11 @@ pl_read (struct EXTRACTOR_PluginList *plugin, unsigned char **data, size_t count } -/** - * Initializes an extracting session for a plugin. - * opens the file/shm (only in OPMODE_FILE) - * sets shm_ptr to NULL (unmaps it, if it was mapped) - * sets position to 0 - * initializes file size to 'fsize' (may be -1) - * sets seek request to 0 - * - * @param plugin plugin context - * @param operation_mode the mode of operation (OPMODE_*) - * @param fsize size of the source file (may be -1) - * @param shm_name name of the shm or file to open - * @return 0 on success, non-0 on error. - */ -static int -init_state_method (struct EXTRACTOR_PluginList *plugin, - uint8_t operation_mode, - int64_t fsize, - const char *shm_name) +static uint64_t +plugin_env_get_size (void *cls) { - plugin->seek_request = 0; - if (plugin->shm_ptr != NULL) - munmap (plugin->shm_ptr, plugin->map_size); - plugin->shm_ptr = NULL; - if (operation_mode == OPMODE_FILE) - { - if (-1 == plugin_open_file (plugin, shm_name)) - return 1; - } - else if (-1 == plugin_open_shm (plugin, shm_name)) - return 1; - plugin->fsize = fsize; - plugin->shm_pos = 0; - plugin->fpos = 0; - return 0; + struct ProcessingContext *pc = cls; + return pc->file_size; } @@ -208,7 +215,7 @@ init_state_method (struct EXTRACTOR_PluginList *plugin, * Function called by a plugin in a child process. Transmits * the meta data back to the parent process. * - * @param cls closure, "int*" of the FD for transmission + * @param cls closure, "struct ProcessingContext" with the FD for transmission * @param plugin_name name of the plugin that produced this value; * special values can be used (i.e. '' for zlib being * used in the main libextractor library and yielding @@ -222,16 +229,17 @@ init_state_method (struct EXTRACTOR_PluginList *plugin, * @return 0 to continue extracting, 1 to abort (transmission error) */ static int -transmit_reply (void *cls, - const char *plugin_name, - enum EXTRACTOR_MetaType type, - enum EXTRACTOR_MetaFormat format, - const char *data_mime_type, - const char *data, - size_t data_len) +plugin_env_send_proc (void *cls, + const char *plugin_name, + enum EXTRACTOR_MetaType type, + enum EXTRACTOR_MetaFormat format, + const char *data_mime_type, + const char *data, + size_t data_len) { + struct ProcessingContext *pc = cls; static const unsigned char meta_byte = MESSAGE_META; - int *cpipe_out = cls; + int cpipe_out = pc->out; struct IpcHeader hdr; size_t mime_len; @@ -263,132 +271,126 @@ transmit_reply (void *cls, /** - * Main loop function for plugins. Reads a message from the plugin - * input pipe and acts on it. + * Handle an init message. The opcode itself has already been read. * - * @param plugin plugin context - * @param in input stream with incoming requests - * @param out output stream for sending responses - */ -static void -process_requests (struct EXTRACTOR_PluginList *plugin, - int in, - int out) + * @param pc processing context + * @return 0 on success, -1 on error + */ +static int +handle_init_message (struct ProcessingContext *pc) { - int read_result1; - int read_result2; - int read_result3; - int read_result4; - unsigned char code; - char *shm_name = NULL; - size_t shm_name_len; - int extract_reply; - struct IpcHeader hdr; - int do_break; -#ifdef WINDOWS - HANDLE map; - MEMORY_BASIC_INFORMATION mi; -#endif + struct InitMessage init; - /* The point of recursing into this function is to request - * a seek from LE server and wait for a reply. This snipper - * requests a seek. - */ - if (plugin->waiting_for_update == 1) + if (NULL != pc->shm) + return -1; + if (sizeof (struct InitMessage) - 1 + != read (pc->in, + &init.reserved, + sizeof (struct InitMessage) - 1)) + return -1; + if (init.shm_name_length > MAX_SHM_NAME) + return -1; { - unsigned char seek_byte = MESSAGE_SEEK; - if (write (out, &seek_byte, 1) != 1) + char shm_name[init.shm_name_length + 1]; + + if (init.shm_name_length + != read (pc->in, + shm_name, + init.shm_name_length)) return -1; - if (write (out, &plugin->seek_request, sizeof (int64_t)) != sizeof (int64_t)) + shm_name[init.shm_name_length] = '\0'; + + pc->shm_map_size = init.shm_map_size; +#if WINDOWS + pc->shm_ptr = MapViewOfFile (pc->shm_id, FILE_MAP_READ, 0, 0, 0); + if (NULL == pc->shm_ptr) + return -1; +#else + pc->shm_id = open (shm_name, O_RDONLY, 0); + if (-1 == pc->shm_id) return -1; + pc->shm = mmap (NULL, + pc->shm_map_size, + PROT_READ, + MAP_SHARED, + pc->shm_id, 0); + if ( ((void*) -1) == pc->shm) + return -1; +#endif } + return 0; +} - memset (&hdr, 0, sizeof (hdr)); - do_break = 0; - while (!do_break) - { - read_result1 = read (in, &code, 1); - if (read_result1 <= 0) - break; - switch (code) + +/** + * Handle a start message. The opcode itself has already been read. + * + * @param pc processing context + * @return 0 on success, -1 on error + */ +static int +handle_start_message (struct ProcessingContext *pc) +{ + struct StartMessage start; + struct EXTRACTOR_ExtractContext ec; + + if (sizeof (struct StartMessage) - 1 + != read (pc->in, + &start.reserved, + sizeof (struct StartMessage) - 1)) + return -1; + pc->shm_ready_bytes = start.shm_ready_bytes; + pc->file_size = start.shm_file_size; + pc->read_position = 0; + pc->shm_off = 0; + ec.cls = pc; + ec.config = pc->plugin->plugin_options; + ec.read = &plugin_env_read; + ec.seek = &plugin_env_seek; + ec.get_size = &plugin_env_get_size; + ec.proc = &plugin_env_send_proc; + pc->plugin->extract_method (&ec); +} + + +/** + * Main loop function for plugins. Reads a message from the plugin + * input pipe and acts on it. + * + * @param pc processing context + */ +static void +process_requests (struct ProcessingContext *pc) +{ + while (1) { - case MESSAGE_INIT_STATE: - read_result2 = read (in, &plugin->operation_mode, sizeof (uint8_t)); - read_result3 = read (in, &plugin->fsize, sizeof (int64_t)); - read_result4 = read (in, &shm_name_len, sizeof (size_t)); - if ((read_result2 < sizeof (uint8_t)) || - (read_result3 < sizeof (int64_t)) || - (read_result4 < sizeof (size_t))) - { - do_break = 1; - break; - } - if (plugin->operation_mode != OPMODE_MEMORY && - plugin->operation_mode != OPMODE_DECOMPRESS && - plugin->operation_mode != OPMODE_FILE) - { - do_break = 1; - break; - } - if ((plugin->operation_mode == OPMODE_MEMORY || - plugin->operation_mode == OPMODE_DECOMPRESS) && - shm_name_len > MAX_SHM_NAME) - { - do_break = 1; - break; - } - /* Fsize may be -1 only in decompression mode */ - if (plugin->operation_mode != OPMODE_DECOMPRESS && plugin->fsize <= 0) - { - do_break = 1; - break; - } - if (shm_name != NULL) - free (shm_name); - shm_name = malloc (shm_name_len); - if (shm_name == NULL) - { - do_break = 1; - break; - } - read_result2 = read (in, shm_name, shm_name_len); - if (read_result2 < shm_name_len) - { - do_break = 1; - break; - } - shm_name[shm_name_len - 1] = '\0'; - do_break = init_state_method (plugin, plugin->operation_mode, plugin->fsize, shm_name); - /* in OPMODE_MEMORY and OPMODE_FILE we can start extracting right away, - * there won't be UPDATED_SHM message, and we don't need it - */ - if (!do_break && (plugin->operation_mode == OPMODE_MEMORY || - plugin->operation_mode == OPMODE_FILE)) - { - extract_reply = plugin->extract_method (plugin, transmit_reply, &out); - unsigned char done_byte = MESSAGE_DONE; - if (write (out, &done_byte, 1) != 1) - { - do_break = 1; - break; - } - if ((plugin->specials != NULL) && - (NULL != strstr (plugin->specials, "force-kill"))) - { - /* we're required to die after each file since this - plugin only supports a single file at a time */ -#if !WINDOWS - fsync (out); -#else - _commit (out); -#endif - _exit (0); - } - } - break; - case MESSAGE_DISCARD_STATE: - discard_state_method (plugin); - break; + unsigned char code; + + if (1 != read (pc->in, &code, 1)) + break; + switch (code) + { + case MESSAGE_INIT_STATE: + if (0 != handle_init_message (pc)) + return; + break; + case MSG_EXTRACT_START: + if (0 != handle_start_message (pc)) + return; + case MSG_UPDATED_SHM: + /* not allowed here, we're not waiting for SHM to move! */ + return; + case MSG_DISCARD_STATE: + /* odd, we're already in the start state... */ + continue; + default: + /* error, unexpected message */ + return; + } + } +} + +#if 0 case MESSAGE_UPDATED_SHM: if (plugin->operation_mode == OPMODE_DECOMPRESS) { @@ -413,7 +415,7 @@ process_requests (struct EXTRACTOR_PluginList *plugin, } #else if ((plugin->map_handle == 0) || - (NULL == (plugin->shm_ptr = MapViewOfFile (plugin->map_handle, FILE_MAP_READ, 0, 0, 0)))) + (NULL == (plugin->shm_ptr = { do_break = 1; break; @@ -478,20 +480,8 @@ process_requests (struct EXTRACTOR_PluginList *plugin, } } } - else - { - /* This is mostly to safely skip unrelated messages */ - int64_t t; - size_t t2; - read_result2 = read (in, &t, sizeof (int64_t)); - read_result3 = read (in, &t2, sizeof (size_t)); - read_result4 = read (in, &t, sizeof (int64_t)); - } - break; - } - } - return 0; } +#endif #ifndef WINDOWS @@ -536,6 +526,8 @@ void EXTRACTOR_plugin_main_ (struct EXTRACTOR_PluginList *plugin, int in, int out) { + struct ProcessingContext pc; + if (0 != EXTRACTOR_plugin_load_ (plugin)) { #if DEBUG @@ -560,7 +552,23 @@ EXTRACTOR_plugin_main_ (struct EXTRACTOR_PluginList *plugin, open_dev_null (1, O_WRONLY); #endif } - process_requests (plugin, in, out); + pc.plugin = plugin; + pc.in = in; + pc.out = out; + pc.shm_id = -1; + pc.shm = NULL; + pc.shm_map_size = 0; + process_requests (&pc); +#if WINDOWS + if (NULL != pc.shm_ptr) + UnmapViewOfFile (pc.shm_ptr); +#else + if ( (NULL != pc.shm_ptr) && + (((void*) 1) != pc.shm_ptr) ) + munmap (pc.shm_ptr, pc.shm_map_size); + if (-1 != pc.shm_id) + (void) close (pc.shm_id); +#endif } -- cgit v1.2.3