libextractor

GNU libextractor
Log | Files | Refs | Submodules | README | LICENSE

commit ec05c1c8a5304f221b6fcea24c67c478893e6c4a
parent 3b99af939bfe580c31d02db6aed4aab92cb2643c
Author: Christian Grothoff <christian@grothoff.org>
Date:   Thu, 12 Apr 2012 16:44:30 +0000

-LRN: minor cleanup, documentation

Diffstat:
Msrc/main/extractor.c | 589+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
Msrc/main/extractor_plugins.h | 51+++++++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 570 insertions(+), 70 deletions(-)

diff --git a/src/main/extractor.c b/src/main/extractor.c @@ -49,16 +49,13 @@ #define MAX_READ 32 * 1024 * 1024 /** - * How many bytes do we actually try to decompress? (from the beginning - * of the file). Limit to 16 MB. - */ -#define MAX_DECOMPRESS 16 * 1024 * 1024 - -/** * Maximum length of a Mime-Type string. */ #define MAX_MIME_LEN 256 +/** + * Maximum length of a shared memory object name + */ #define MAX_SHM_NAME 255 /** @@ -67,15 +64,62 @@ */ #define DEBUG 1 +/** + * Sent from LE to a plugin to initialize it (open shm, + * reset position counters etc). + */ #define MESSAGE_INIT_STATE 0x01 + +/** + * Sent from LE to a plugin to tell it that shm contents + * were updated. Only used for OPMODE_COMPRESS. + */ #define MESSAGE_UPDATED_SHM 0x02 + +/** + * Sent from plugin to LE to tell LE that plugin is done + * analyzing current file and will send no more data. + */ #define MESSAGE_DONE 0x03 + +/** + * Sent from plugin to LE to tell LE that plugin needs + * to read a different part of the source file. + */ #define MESSAGE_SEEK 0x04 + +/** + * Sent from plugin to LE to tell LE about metadata discovered. + */ #define MESSAGE_META 0x05 + +/** + * Sent from LE to plugin to make plugin discard its state (unmap + * and close shm). + */ #define MESSAGE_DISCARD_STATE 0x06 +/** + * Client provided a memory buffer, analyze it. Creates a shm, copies + * buffer contents into it. Does not support seeking (all data comes + * in one [big] chunk. + */ #define OPMODE_MEMORY 1 + +/** + * Client provided a memory buffer or a file, which contains compressed data. + * Creates a shm of limited size and repeatedly fills it with uncompressed + * data. Never skips data (has to uncompress every byte, discards unwanted bytes), + * can't efficiently seek backwards. Uses MESSAGE_UPDATED_SHM and MESSAGE_SEEK. + */ #define OPMODE_DECOMPRESS 2 + +/** + * Client provided a filename. Creates a file-backed shm (on W32) or just + * communicates the file name to each plugin, and plugin opens its own file + * descriptor of the file (POSIX). Each plugin maps different parts of the + * file into its memory independently. + */ #define OPMODE_FILE 3 /** @@ -92,7 +136,16 @@ struct IpcHeader }; #if !WINDOWS -int +/** + * Opens a shared memory object (for later mmapping). + * This is POSIX variant of the the plugin_open_* function. Shm is always memory-backed. + * Closes a shm is already opened, closes it before opening a new one. + * + * @param plugin plugin context + * @param shm_name name of the shm. + * @return shm id (-1 on error). That is, the result of shm_open() syscall. + */ +static int plugin_open_shm (struct EXTRACTOR_PluginList *plugin, const char *shm_name) { if (plugin->shm_id != -1) @@ -100,7 +153,17 @@ plugin_open_shm (struct EXTRACTOR_PluginList *plugin, const char *shm_name) plugin->shm_id = shm_open (shm_name, O_RDONLY, 0); return plugin->shm_id; } -int + +/** + * Opens a file (for later mmapping). + * This is POSIX variant of the plugin_open_* function. + * Closes a file is already opened, closes it before opening a new one. + * + * @param plugin plugin context + * @param shm_name name of the file to open. + * @return file id (-1 on error). That is, the result of open() syscall. + */ +static int plugin_open_file (struct EXTRACTOR_PluginList *plugin, const char *shm_name) { if (plugin->shm_id != -1) @@ -109,6 +172,17 @@ plugin_open_file (struct EXTRACTOR_PluginList *plugin, const char *shm_name) return plugin->shm_id; } #else +/** + * Opens a shared memory object (for later mmapping). + * This is W32 variant of the plugin_open_* function. + * Opened shm might be memory-backed or file-backed (depending on how + * it was created). shm_name is never a file name, unlike POSIX. + * Closes a shm is already opened, closes it before opening a new one. + * + * @param plugin plugin context + * @param shm_name name of the shared memory object. + * @return memory-mapped file handle (NULL on error). That is, the result of OpenFileMapping() syscall. + */ HANDLE plugin_open_shm (struct EXTRACTOR_PluginList *plugin, const char *shm_name) { @@ -117,6 +191,9 @@ plugin_open_shm (struct EXTRACTOR_PluginList *plugin, const char *shm_name) plugin->map_handle = OpenFileMapping (FILE_MAP_READ, FALSE, shm_name); return plugin->map_handle; } +/** + * Another name for plugin_open_shm(). + */ HANDLE plugin_open_file (struct EXTRACTOR_PluginList *plugin, const char *shm_name) { @@ -124,6 +201,16 @@ plugin_open_file (struct EXTRACTOR_PluginList *plugin, const char *shm_name) } #endif +/** + * Writes @size bytes from @buf into @fd, returns only when + * writing is not possible, or when all @size bytes were written + * (never does partial writes). + * + * @param fd fd to write into + * @param buf buffer to read from + * @param size number of bytes to write + * @return number of bytes written (that is - @size), or -1 on error + */ static int write_all (int fd, const void *buf, @@ -194,7 +281,20 @@ transmit_reply (void *cls, return 0; } -/* init the read/seek wrappers */ +/** + * Initializes an extracting session for a plugin. + * opens the file/shm (only in OPMODE_FILE) + * sets shm_ptr to NULL (unmaps it, if it was mapped) + * sets position to 0 + * initializes file size to @fsize (may be -1) + * sets seek request to 0 + * + * @param plugin plugin context + * @param operation_mode the mode of operation (OPMODE_*) + * @param fsize size of the source file (may be -1) + * @param shm_name name of the shm or file to open + * @return 0 on success, non-0 on error. + */ static int init_state_method (struct EXTRACTOR_PluginList *plugin, uint8_t operation_mode, int64_t fsize, const char *shm_name) { @@ -223,6 +323,14 @@ init_state_method (struct EXTRACTOR_PluginList *plugin, uint8_t operation_mode, return 0; } +/** + * Deinitializes an extracting session for a plugin. + * unmaps shm_ptr (if was mapped) + * closes file/shm (if it was opened) + * sets map size and shm_ptr to NULL. + * + * @param plugin plugin context + */ static void discard_state_method (struct EXTRACTOR_PluginList *plugin) { @@ -243,6 +351,15 @@ discard_state_method (struct EXTRACTOR_PluginList *plugin) plugin->shm_ptr = NULL; } +/** + * Main loop function for plugins. + * Reads a message from the plugin input pipe and acts on it. + * Can be called recursively (once) in OPMODE_DECOMPRESS. + * plugin->waiting_for_update == 1 indicates the recursive call. + * + * @param plugin plugin context + * @return 0, always + */ static int process_requests (struct EXTRACTOR_PluginList *plugin) { @@ -265,6 +382,10 @@ process_requests (struct EXTRACTOR_PluginList *plugin) in = plugin->pipe_in; out = plugin->cpipe_out; + /* The point of recursing into this function is to request + * a seek from LE server and wait for a reply. This snipper + * requests a seek. + */ if (plugin->waiting_for_update == 1) { unsigned char seek_byte = MESSAGE_SEEK; @@ -308,6 +429,7 @@ process_requests (struct EXTRACTOR_PluginList *plugin) do_break = 1; break; } + /* Fsize may be -1 only in decompression mode */ if (plugin->operation_mode != OPMODE_DECOMPRESS && plugin->fsize <= 0) { do_break = 1; @@ -329,6 +451,9 @@ process_requests (struct EXTRACTOR_PluginList *plugin) } shm_name[shm_name_len - 1] = '\0'; do_break = init_state_method (plugin, plugin->operation_mode, plugin->fsize, shm_name); + /* in OPMODE_MEMORY and OPMODE_FILE we can start extracting right away, + * there won't be UPDATED_SHM message, and we don't need it + */ if (!do_break && (plugin->operation_mode == OPMODE_MEMORY || plugin->operation_mode == OPMODE_FILE)) { @@ -369,6 +494,7 @@ process_requests (struct EXTRACTOR_PluginList *plugin) break; } /* FIXME: also check mapped region size (lseek for *nix, VirtualQuery for W32) */ + /* Re-map the shm */ #if !WINDOWS if ((-1 == plugin->shm_id) || (NULL == (plugin->shm_ptr = mmap (NULL, plugin->map_size, PROT_READ, MAP_SHARED, plugin->shm_id, 0))) || @@ -387,11 +513,16 @@ process_requests (struct EXTRACTOR_PluginList *plugin) #endif if (plugin->waiting_for_update == 1) { + /* We were only waiting for this one message */ do_break = 1; plugin->waiting_for_update = 2; break; } + /* Run extractor on mapped region (recursive call doesn't reach this + * point and breaks out earlier. + */ extract_reply = plugin->extract_method (plugin, transmit_reply, &out); + /* Unmap the shm */ #if !WINDOWS if ((plugin->shm_ptr != NULL) && (plugin->shm_ptr != (void*) -1) ) @@ -403,6 +534,7 @@ process_requests (struct EXTRACTOR_PluginList *plugin) plugin->shm_ptr = NULL; if (extract_reply == 1) { + /* Tell LE that we're done */ unsigned char done_byte = MESSAGE_DONE; if (write (out, &done_byte, 1) != 1) { @@ -424,6 +556,7 @@ process_requests (struct EXTRACTOR_PluginList *plugin) } else { + /* Tell LE that we're not done, and we need to seek */ unsigned char seek_byte = MESSAGE_SEEK; if (write (out, &seek_byte, 1) != 1) { @@ -439,6 +572,7 @@ process_requests (struct EXTRACTOR_PluginList *plugin) } else { + /* This is mostly to safely skip unrelated messages */ int64_t t; size_t t2; read_result2 = read (in, &t, sizeof (int64_t)); @@ -452,9 +586,8 @@ process_requests (struct EXTRACTOR_PluginList *plugin) } /** - * 'main' function of the child process. Reads shm-filenames from - * 'in' (line-by-line) and writes meta data blocks to 'out'. The meta - * data stream is terminated by an empty entry. + * 'main' function of the child process. Loads the plugin, + * sets up its in and out pipes, then runs the request serving function. * * @param plugin extractor plugin to use * @param in stream to read from @@ -486,6 +619,7 @@ plugin_main (struct EXTRACTOR_PluginList *plugin, int in, int out) close (1); plugin->pipe_in = in; + /* Compiler will complain, and it's right. This is a kind of hack...*/ plugin->cpipe_out = out; process_requests (plugin); @@ -606,11 +740,11 @@ stop_process (struct EXTRACTOR_PluginList *plugin) static int write_plugin_data (const struct EXTRACTOR_PluginList *plugin) { - /* only does anything on Windows */ + /* This function is only necessary on W32. On POSIX + * systems plugin inherits its own data from the parent */ return 0; } -#define plugin_print(plug, fmt, ...) fprintf (plug->cpipe_in, fmt, ...) #define plugin_write(plug, buf, size) write_all (fileno (plug->cpipe_in), buf, size) #else /* WINDOWS */ @@ -721,6 +855,28 @@ create_selectable_pipe (PHANDLE read_pipe_ptr, PHANDLE write_pipe_ptr, return 0; } +/** + * Writes @size bytes from @buf to @h, using @ov for + * overlapped i/o. Deallocates @old_buf and sets it to NULL, + * if necessary. + * Writes asynchronously, but sequentially (only one writing + * operation may be active at any given moment, but it will + * be done in background). Thus it is intended to be used + * for writing a few big chunks rather than a lot of small pieces. + * + * The extravagant interface is mainly because this function + * does not use a separate struct to group together overlapped + * structure, buffer pointer and the handle. + * + * @param h pipe handle + * @param ov overlapped structure pointer + * @param buf buffer to read from. Will be copied internally + * @param size number of bytes to write + * @param old_buf pointer where a copy of previous buffer is stored, + * and where a copy of @buf will be stored. + * + * @return number of bytes written, -1 on error + */ static int write_to_pipe (HANDLE h, OVERLAPPED *ov, unsigned char *buf, size_t size, unsigned char **old_buf) { @@ -765,39 +921,17 @@ write_to_pipe (HANDLE h, OVERLAPPED *ov, unsigned char *buf, size_t size, unsign return -1; } -static int -print_to_pipe (HANDLE h, OVERLAPPED *ov, unsigned char **buf, const char *fmt, ...) -{ - va_list va; - va_list vacp; - size_t size; - char *print_buf; - int result; - - va_start (va, fmt); - va_copy (vacp, va); - size = VSNPRINTF (NULL, 0, fmt, vacp) + 1; - va_end (vacp); - if (size <= 0) - { - va_end (va); - return size; - } - - print_buf = malloc (size); - if (print_buf == NULL) - return -1; - VSNPRINTF (print_buf, size, fmt, va); - va_end (va); - - result = write_to_pipe (h, ov, print_buf, size, buf); - free (buf); - return result; -} - -#define plugin_print(plug, fmt, ...) print_to_pipe (plug->cpipe_in, &plug->ov_write, &plug->ov_write_buffer, fmt, ...) #define plugin_write(plug, buf, size) write_to_pipe (plug->cpipe_in, &plug->ov_write, buf, size, &plug->ov_write_buffer) +/** + * Communicates plugin data (library name, options) to the plugin + * process. This is only necessary on W32, where this information + * is not inherited by the plugin, because it is not forked. + * + * @param plugin plugin context + * + * @return 0 on success, -1 on failure + */ static int write_plugin_data (struct EXTRACTOR_PluginList *plugin) { @@ -864,6 +998,14 @@ write_plugin_data (struct EXTRACTOR_PluginList *plugin) return 0; } +/** + * Reads plugin data from the LE server process. + * Also initializes allocation granularity (duh...). + * + * @param fd the pipe to read from + * + * @return newly allocated plugin context + */ static struct EXTRACTOR_PluginList * read_plugin_data (int fd) { @@ -995,6 +1137,10 @@ start_process (struct EXTRACTOR_PluginList *plugin) return; } + /* TODO: write our own plugin-hosting executable? rundll32, for once, has smaller than usual stack size. + * Also, users might freak out seeing over 9000 rundll32 processes (seeing over 9000 processes named + * "libextractor_plugin_helper" is probably less confusing). + */ snprintf(cmd, MAX_PATH + 1, "rundll32.exe libextractor-3.dll,RundllEntryPoint@16 %lu %lu", p10_os_inh, p21_os_inh); cmd[MAX_PATH] = '\0'; if (CreateProcessA (NULL, cmd, NULL, NULL, TRUE, 0, NULL, NULL, @@ -1189,10 +1335,10 @@ make_shm_w32 (void **ptr, HANDLE *map, char *fn, size_t fn_size, size_t size) } /** - * Setup a shared memory segment. + * Setup a file-backed shared memory segment. * - * @param ptr set to the location of the map segment * @param map where to store the map handle + * @param file handle of the file to back the shm * @param fn name of the mapping * @param fn_size size available in fn * @param size number of bytes to allocated for the mapping @@ -1283,32 +1429,109 @@ destroy_shm_posix (void *ptr, int shm_id, size_t size, char *shm_name) #define O_LARGEFILE 0 #endif +/** + * A poor attempt to abstract the data source (file or a memory buffer) + * for the decompressor. + */ struct BufferedFileDataSource { + /** + * Descriptor of the file to read data from (may be -1) + */ int fd; + + /** + * Pointer to the buffer to read from (may be NULL) + */ const unsigned char *data; + /** + * Size of the file (or the data buffer) + */ int64_t fsize; + + /** + * Position within the file or the data buffer + */ int64_t fpos; + /** + * A buffer to read into. For fd != -1: when data != NULL, + * data is used directly. + */ unsigned char *buffer; + + /** + * Position within the buffer. + */ int64_t buffer_pos; + + /** + * Number of bytes in the buffer (<= buffer_size) + */ int64_t buffer_bytes; + + /** + * Allocated size of the buffer + */ int64_t buffer_size; }; +/** + * Creates a bfds + * + * @param data data buffer to use as a source (NULL if fd != -1) + * @param fd file descriptor to use as a source (-1 if data != NULL) + * @param fsize size of the file (or the buffer) + * @return newly allocated bfds + */ struct BufferedFileDataSource * bfds_new (const unsigned char *data, int fd, int64_t fsize); +/** + * Unallocates bfds + * + * @param bfds bfds to deallocate + */ void bfds_delete (struct BufferedFileDataSource *bfds); +/** + * Makes bfds seek to @pos and read a chunk of bytes there. + * Changes bfds->fpos, bfds->buffer_bytes and bfds->buffer_pos. + * Does almost nothing for memory-backed bfds. + * + * @param bfds bfds + * @param pos position + * @return 0 on success, -1 on error + */ int bfds_pick_next_buffer_at (struct BufferedFileDataSource *bfds, int64_t pos); +/** + * Makes bfds seek to @pos in @whence mode. + * Will try to seek within the buffer, will move the buffer location if + * the seek request falls outside of the buffer range. + * + * @param bfds bfds + * @param pos position to seek to + * @param whence one of the seek constants (SEEK_CUR, SEEK_SET, SEEK_END) + * @return new absolute position + */ int64_t bfds_seek (struct BufferedFileDataSource *bfds, int64_t pos, int whence); +/** + * Fills @buf_ptr with a pointer to a chunk of data. + * Same as read() but there's no need to allocate or de-allocate the + * memory (since data IS already in memory). + * Will seek if necessary. Will fail if @count exceeds buffer size. + * + * @param bfds bfds + * @param buf_ptr location to store data pointer + * @param count number of bytes to read + * @return number of bytes (<= count) available at location pointed by buf_ptr + */ int64_t bfds_read (struct BufferedFileDataSource *bfds, unsigned char **buf_ptr, int64_t count); @@ -1477,41 +1700,87 @@ enum ExtractorCompressionType COMP_TYPE_BZ2 = 2 }; +/** + * An object from which uncompressed data can be read + */ struct CompressedFileSource { + /** + * The type of compression used in the source + */ enum ExtractorCompressionType compression_type; + /** + * The source of data + */ struct BufferedFileDataSource *bfds; + /** + * Size of the source (same as bfds->fsize) + */ int64_t fsize; + /** + * Position within the source + */ int64_t fpos; + /** + * Total size of the uncompressed data. Remains -1 until + * decompression is finished. + */ int64_t uncompressed_size; + /* unsigned char *buffer; int64_t buffer_bytes; int64_t buffer_len; + */ #if WINDOWS + /** + * W32 handle of the shm into which data is uncompressed + */ HANDLE shm; #else + /** + * POSIX id of the shm into which data is uncompressed + */ int shm; #endif + /** + * Name of the shm + */ char shm_name[MAX_SHM_NAME + 1]; + /** + * Pointer to the mapped region of the shm (covers the whole shm) + */ void *shm_ptr; + /** + * Position within shm + */ int64_t shm_pos; - size_t shm_buf_pos; + /** + * Allocated size of the shm + */ int64_t shm_size; + /** + * Number of bytes in shm (<= shm_size) + */ size_t shm_buf_size; #if HAVE_ZLIB + /** + * ZLIB stream object + */ z_stream strm; - int ret; - size_t pos; + /** + * Length of gzip header (may be 0, in that case ZLIB parses the header) + */ int gzip_header_length; #endif #if HAVE_LIBBZ2 + /** + * BZ2 stream object + */ bz_stream bstrm; - int bret; - size_t bpos; #endif }; @@ -1558,13 +1827,10 @@ cfs_reset_stream_zlib (struct CompressedFileSource *cfs) cfs->fpos = cfs->gzip_header_length; cfs->shm_pos = 0; - cfs->shm_buf_pos = 0; cfs->shm_buf_size = 0; #if HAVE_ZLIB z_stream strm; - cfs->ret = 0; - cfs->pos = 0; #endif return 1; } @@ -1575,6 +1841,14 @@ cfs_reset_stream_bz2 (struct CompressedFileSource *cfs) return -1; } +/** + * Resets the compression stream to begin uncompressing + * from the beginning. Used at initialization time, and when + * seeking backward. + * + * @param cfs cfs to reset + * @return 1 on success, -1 on error + */ int cfs_reset_stream (struct CompressedFileSource *cfs) { @@ -1687,10 +1961,11 @@ cfs_init_decompressor_zlib (struct CompressedFileSource *cfs, EXTRACTOR_MetaData return cfs_reset_stream_zlib (cfs); } -int +static int cfs_deinit_decompressor_zlib (struct CompressedFileSource *cfs) { inflateEnd (&cfs->strm); + return 1; } static int @@ -1705,6 +1980,15 @@ cfs_deinit_decompressor_bz2 (struct CompressedFileSource *cfs) return -1; } +/** + * Initializes decompression object. Might report metadata about + * compresse stream, if available. Resets the stream to the beginning. + * + * @param cfs cfs to initialize + * @param proc callback for metadata + * @param proc_cls callback cls + * @return 1 on success, -1 on error + */ static int cfs_init_decompressor (struct CompressedFileSource *cfs, EXTRACTOR_MetaDataProcessor proc, void *proc_cls) { @@ -1719,6 +2003,12 @@ cfs_init_decompressor (struct CompressedFileSource *cfs, EXTRACTOR_MetaDataProce } } +/** + * Deinitializes decompression object. + * + * @param cfs cfs to deinitialize + * @return 1 on success, -1 on error + */ static int cfs_deinit_decompressor (struct CompressedFileSource *cfs) { @@ -1733,6 +2023,16 @@ cfs_deinit_decompressor (struct CompressedFileSource *cfs) } } +/** + * Allocates and initializes new cfs object. + * + * @param bfds data source to use + * @param fsize size of the source + * @param compression_type type of compression used + * @param proc metadata callback + * @param proc_cls callback cls + * @return newly allocated cfs on success, NULL on error + */ struct CompressedFileSource * cfs_new (struct BufferedFileDataSource *bfds, int64_t fsize, enum ExtractorCompressionType compression_type, EXTRACTOR_MetaDataProcessor proc, void *proc_cls) { @@ -1761,6 +2061,10 @@ cfs_new (struct BufferedFileDataSource *bfds, int64_t fsize, enum ExtractorCompr return cfs; } +/** + * Data is read from the source and shoved into decompressor + * in chunks this big. + */ #define COM_CHUNK_SIZE (10*1024) int @@ -1801,6 +2105,17 @@ cfs_read_bz2 (struct CompressedFileSource *cfs, int64_t preserve) return -1; } +/** + * Re-fills shm with new uncompressed data, preserving the last + * @preserve bytes of existing data as the first @preserve bytes + * of the new data. + * Does the actual decompression. Will set uncompressed_size on + * the end of compressed stream. + * + * @param cfds cfs to read from + * @param preserve number of bytes to preserve (0 to discard all old data) + * @return number of bytes in shm. 0 if no more data can be uncompressed. + */ int64_t cfs_read (struct CompressedFileSource *cfs, int64_t preserve) { @@ -1844,6 +2159,15 @@ cfs_seek_bz2 (struct CompressedFileSource *cfs, int64_t position) return -1; } +/** + * Moves the buffer to @position in uncompressed steam. If position + * requires seeking backwards beyond the boundaries of the buffer, resets the + * stream and repeats decompression from the beginning to @position. + * + * @param cfds cfs to seek on + * @param position new starting point for the buffer + * @return new absolute buffer position, -1 on error or EOS + */ int64_t cfs_seek (struct CompressedFileSource *cfs, int64_t position) { @@ -1920,8 +2244,17 @@ get_compression_type (const unsigned char *data, int fd, int64_t fsize) return result; } +/** + * Initializes plugin state. Calls init_state_method() + * directly or indirectly. + * + * @param plugin plugin to initialize + * @param operation_mode operation mode + * @param shm_name name of the shm/file + * @param fsize file size (may be -1) + */ static void -init_plugin_state (struct EXTRACTOR_PluginList *plugin, uint8_t operation_mode, int fd, const char *shm_name, int64_t fsize) +init_plugin_state (struct EXTRACTOR_PluginList *plugin, uint8_t operation_mode, const char *shm_name, int64_t fsize) { int write_result; int init_state_size; @@ -1970,6 +2303,12 @@ init_plugin_state (struct EXTRACTOR_PluginList *plugin, uint8_t operation_mode, } } +/** + * Discards plugin state. Calls discard_state_method() + * directly or indirectly. + * + * @param plugin plugin to initialize + */ static void discard_plugin_state (struct EXTRACTOR_PluginList *plugin) { @@ -2002,6 +2341,17 @@ discard_plugin_state (struct EXTRACTOR_PluginList *plugin) } } +/** + * Forces plugin to move the buffer window to @pos. + * + * @param plugin plugin context + * @param pos position to move to + * @param want_start 1 if the caller is interested in the beginning of the + * window, 0 if the caller is interested in its end. Window position + * must be aligned to page size, and this parameter controls the + * direction of window shift. 0 is used mostly by SEEK_END. + * @return 0 on success, -1 on error + */ static int pl_pick_next_buffer_at (struct EXTRACTOR_PluginList *plugin, int64_t pos, uint8_t want_start) { @@ -2114,6 +2464,7 @@ pl_pick_next_buffer_at (struct EXTRACTOR_PluginList *plugin, int64_t pos, uint8_ int64_t old_pos; old_pos = plugin->fpos + plugin->shm_pos; plugin->seek_request = pos; + /* Recourse into request loop to wait for shm update */ while (plugin->fpos != pos) { plugin->waiting_for_update = 1; @@ -2127,18 +2478,28 @@ pl_pick_next_buffer_at (struct EXTRACTOR_PluginList *plugin, int64_t pos, uint8_ { if (pos < plugin->fpos) { - if (1 != cfs_reset_stream (plugin->state)) + if (1 != cfs_reset_stream (plugin->pass_cfs)) return -1; } while (plugin->fpos < pos && plugin->fpos >= 0) - plugin->fpos = cfs_seek (plugin->state, pos); - plugin->fsize = ((struct CompressedFileSource *)plugin->state)->uncompressed_size; + plugin->fpos = cfs_seek (plugin->pass_cfs, pos); + plugin->fsize = ((struct CompressedFileSource *)plugin->pass_cfs)->uncompressed_size; plugin->shm_pos = pos - plugin->fpos; } return 0; } } +/** + * Moves current absolute buffer position to @pos in @whence mode. + * Will move logical position withouth shifting the buffer, if possible. + * Will not move beyond the end of file. + * + * @param plugin plugin context + * @param pos position to move to + * @param whence seek mode (SEEK_CUR, SEEK_SET, SEEK_END) + * @return new absolute position, -1 on error + */ int64_t pl_seek (struct EXTRACTOR_PluginList *plugin, int64_t pos, int whence) { @@ -2203,6 +2564,17 @@ pl_get_pos (struct EXTRACTOR_PluginList *plugin) return plugin->fpos + plugin->shm_pos; } +/** + * Fills @data with a pointer to the data buffer. + * Equivalent to read(), except you don't have to allocate and free + * a buffer, since the data is already in memory. + * Will move the buffer, if necessary + * + * @param plugin plugin context + * @param data location to store data pointer + * @param count number of bytes to read + * @return number of bytes (<= count) avalable in @data, -1 on error + */ int64_t pl_read (struct EXTRACTOR_PluginList *plugin, unsigned char **data, size_t count) { @@ -2226,6 +2598,17 @@ pl_read (struct EXTRACTOR_PluginList *plugin, unsigned char **data, size_t count } } +/** + * Transmits information about updated shm to plugin. + * For OPMODE_DECOMPRESS only. + * + * @param plugin plugin context + * @param position current absolute position in uncompressed stream + * @param map_size number of bytes that are available in shm + * @param fsize total size of the uncompressed stream (might be -1) + * @param operation_mode mode of operation + * @return 0 on success, 1 on error + */ static int give_shm_to_plugin (struct EXTRACTOR_PluginList *plugin, int64_t position, size_t map_size, int64_t fsize, uint8_t operation_mode) { @@ -2272,6 +2655,14 @@ give_shm_to_plugin (struct EXTRACTOR_PluginList *plugin, int64_t position, size_ } } +/** + * Calls _extract_method of in-process plugin. + * + * @param plugin plugin context + * @param shm_ptr pointer to the data buffer + * @param proc metadata callback + * @param proc_cls callback cls + */ static void ask_in_process_plugin (struct EXTRACTOR_PluginList *plugin, void *shm_ptr, EXTRACTOR_MetaDataProcessor proc, void *proc_cls) { @@ -2297,6 +2688,14 @@ ask_in_process_plugin (struct EXTRACTOR_PluginList *plugin, void *shm_ptr, EXTRA } #if !WINDOWS +/** + * Receive @size bytes from plugin, store them in @buf + * + * @param plugin plugin context + * @param buf buffer to fill + * @param size number of bytes to read + * @return number of bytes read, 0 on EOS, < 0 on error + */ int plugin_read (struct EXTRACTOR_PluginList *plugin, unsigned char *buf, size_t size) { @@ -2312,6 +2711,14 @@ plugin_read (struct EXTRACTOR_PluginList *plugin, unsigned char *buf, size_t siz return read_count; } #else +/** + * Receive @size bytes from plugin, store them in @buf + * + * @param plugin plugin context + * @param buf buffer to fill + * @param size number of bytes to read + * @return number of bytes read, 0 on EOS, < 0 on error + */ int plugin_read (struct EXTRACTOR_PluginList *plugin, unsigned char *buf, size_t size) { @@ -2329,6 +2736,14 @@ plugin_read (struct EXTRACTOR_PluginList *plugin, unsigned char *buf, size_t siz } #endif +/** + * Receive a reply from plugin (seek request, metadata and done message) + * + * @param plugin plugin context + * @param proc metadata callback + * @param proc_cls callback cls + * @return 0 on success, -1 on error + */ static int receive_reply (struct EXTRACTOR_PluginList *plugin, EXTRACTOR_MetaDataProcessor proc, void *proc_cls) { @@ -2397,6 +2812,19 @@ receive_reply (struct EXTRACTOR_PluginList *plugin, EXTRACTOR_MetaDataProcessor } #if !WINDOWS +/** + * Wait for one of the plugins to reply. + * Selects on plugin output pipes, runs receive_reply() + * on each activated pipe until it gets a seek request + * or a done message. Called repeatedly by the user until all pipes are dry or + * broken. + * + * @param plugins to select upon + * @param proc metadata callback + * @param proc_cls callback cls + * @return number of dry/broken pipes since last call, -1 on error or if no + * plugins reply in 10 seconds. + */ static int wait_for_reply (struct EXTRACTOR_PluginList *plugins, EXTRACTOR_MetaDataProcessor proc, void *proc_cls) { @@ -2464,6 +2892,21 @@ wait_for_reply (struct EXTRACTOR_PluginList *plugins, EXTRACTOR_MetaDataProcesso return result; } #else +/** + * Wait for one of the plugins to reply. + * Selects on plugin output pipes, runs receive_reply() + * on each activated pipe until it gets a seek request + * or a done message. Called repeatedly by the user until all pipes are dry or + * broken. + * This W32 version of wait_for_reply() can't select on more than 64 plugins + * at once (returns -1 if there are more than 64 plugins). + * + * @param plugins to select upon + * @param proc metadata callback + * @param proc_cls callback cls + * @return number of dry/broken pipes since last call, -1 on error or if no + * plugins reply in 10 seconds. + */ static int wait_for_reply (struct EXTRACTOR_PluginList *plugins, EXTRACTOR_MetaDataProcessor proc, void *proc_cls) { @@ -2563,6 +3006,16 @@ wait_for_reply (struct EXTRACTOR_PluginList *plugins, EXTRACTOR_MetaDataProcesso #endif +/** + * Checks the seek requests that plugins made, finds the one with + * smallest offset from the beginning of the stream, and satisfies it. + * + * @param plugins to check + * @param cfs compressed file source to seek in + * @param current_position current stream position + * @param map_size number of bytes currently buffered + * @return new stream position, -1 on error + */ static int64_t seek_to_new_position (struct EXTRACTOR_PluginList *plugins, struct CompressedFileSource *cfs, int64_t current_position, int64_t map_size) { @@ -2617,9 +3070,9 @@ load_in_process_plugin (struct EXTRACTOR_PluginList *plugin) * @param plugins the list of plugins to use * @param data data to process, or NULL if fds is not -1 * @param fd file to read data from, or -1 if data is not NULL - * @param fsize size of data or size of file - * @param buffer a buffer with data alteady read from the file (if fd != -1) - * @param buffer_size size of buffer + * @param filename name of the file to which fd belongs + * @param cfs compressed file source for compressed stream (may be NULL) + * @param fsize size of the file or data buffer * @param proc function to call for each meta data item found * @param proc_cls cls argument to proc */ @@ -2700,21 +3153,21 @@ do_extract (struct EXTRACTOR_PluginList *plugins, const char *data, int fd, cons if (operation_mode == OPMODE_DECOMPRESS) { for (ppos = plugins; NULL != ppos; ppos = ppos->next) - init_plugin_state (ppos, operation_mode, -1, cfs->shm_name, -1); + init_plugin_state (ppos, operation_mode, cfs->shm_name, -1); } else if (operation_mode == OPMODE_FILE) { for (ppos = plugins; NULL != ppos; ppos = ppos->next) #if !WINDOWS - init_plugin_state (ppos, operation_mode, fd, filename, fsize); + init_plugin_state (ppos, operation_mode, filename, fsize); #else - init_plugin_state (ppos, operation_mode, fd, shm_name, fsize); + init_plugin_state (ppos, operation_mode, shm_name, fsize); #endif } else { for (ppos = plugins; NULL != ppos; ppos = ppos->next) - init_plugin_state (ppos, operation_mode, -1, shm_name, fsize); + init_plugin_state (ppos, operation_mode, shm_name, fsize); } if (operation_mode == OPMODE_FILE || operation_mode == OPMODE_MEMORY) @@ -2749,7 +3202,7 @@ do_extract (struct EXTRACTOR_PluginList *plugins, const char *data, int fd, cons { /* Pass this way. we'll need it to call cfs functions later on */ /* This is a special case */ - ppos->state = cfs; + ppos->pass_cfs = cfs; ask_in_process_plugin (ppos, cfs->shm_ptr, proc, proc_cls); } while (plugins_not_ready > 0 && !kill_plugins) diff --git a/src/main/extractor_plugins.h b/src/main/extractor_plugins.h @@ -101,6 +101,10 @@ struct EXTRACTOR_PluginList #else HANDLE cpipe_in; #endif + + /** + * Pipe used by plugin to read from its parent. + */ int pipe_in; /** @@ -110,36 +114,71 @@ struct EXTRACTOR_PluginList int64_t seek_request; #if !WINDOWS + /** + * ID of the shm object + */ int shm_id; #else + /** + * Handle of the shm object + */ HANDLE map_handle; #endif - void *state; + /** + * Used to pass cfs pointer to in-process plugin in OPMODE_DECOMPRESS + */ + void *pass_cfs; + /** + * Uncompressed stream size. Initially -1, until file is fully decompressed + * (for sources that are not compressed it is set from the start). + */ int64_t fsize; + /** + * Absolute position within the stream + */ int64_t fpos; + /** + * Pointer to the shared memory segment + */ unsigned char *shm_ptr; + /** + * Number of bytes in the segment + */ int64_t map_size; + /** + * Position within the segment + */ int64_t shm_pos; +#if !WINDOWS /** * Pipe used to read information about extracted meta data from * the plugin child process. -1 if not initialized. */ -#if !WINDOWS int cpipe_out; #else + /** + * Pipe used to read information about extracted meta data from + * the plugin child process. -1 if not initialized. + */ HANDLE cpipe_out; #endif #if !WINDOWS + /** + * Page size. Mmap offset is a multiple of this number. + */ long allocation_granularity; #else + /** + * Page size. Mmap offset is a multiple of this number. + */ DWORD allocation_granularity; #endif @@ -160,7 +199,15 @@ struct EXTRACTOR_PluginList unsigned char *ov_write_buffer; #endif + /** + * Mode of operation. One of the OPMODE_* constants + */ uint8_t operation_mode; + + /** + * 1 if plugin is currently in a recursive process_requests() call, + * 0 otherwise + */ int waiting_for_update; };