libextractor

GNU libextractor
Log | Files | Refs | Submodules | README | LICENSE

commit 07aa6e57127306886d97c592d7d1f8b8ea9c77d1
parent f81ecad624db4959d5ed4f1320b2d4a5762a2a42
Author: Christian Grothoff <christian@grothoff.org>
Date:   Sat,  4 Aug 2012 17:32:18 +0000

gzip wip

Diffstat:
Msrc/main/extractor.c | 67+++++++++++++++++++++++++++++++++++++++++++++++--------------------
Msrc/main/extractor_datasource.c | 86+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Msrc/main/extractor_datasource.h | 4+++-
Msrc/main/extractor_ipc.c | 6+++++-
Msrc/main/extractor_ipc.h | 11++++++++---
Msrc/main/extractor_ipc_gnu.c | 5++++-
Msrc/main/extractor_plugin_main.c | 41+++++++++++++++++++++++++++++++++++------
Msrc/main/extractor_plugins.h | 11++++++++++-
Msrc/main/test_extractor.c | 10++++------
9 files changed, 176 insertions(+), 65 deletions(-)

diff --git a/src/main/extractor.c b/src/main/extractor.c @@ -82,7 +82,7 @@ send_update_message (struct EXTRACTOR_PluginList *plugin, um.reserved2 = 0; um.shm_ready_bytes = (uint32_t) data_available; um.shm_off = (uint64_t) shm_off; - um.file_size = EXTRACTOR_datasource_get_size_ (ds); + um.file_size = EXTRACTOR_datasource_get_size_ (ds, 0); if (sizeof (um) != EXTRACTOR_IPC_channel_send_ (plugin->channel, &um, @@ -300,7 +300,7 @@ in_process_get_size (void *cls) { struct InProcessContext *ctx = cls; - return (uint64_t) EXTRACTOR_datasource_get_size_ (ctx->ds); + return (uint64_t) EXTRACTOR_datasource_get_size_ (ctx->ds, 0); } @@ -373,8 +373,8 @@ do_extract (struct EXTRACTOR_PluginList *plugins, struct InProcessContext ctx; struct EXTRACTOR_ExtractContext ec; int64_t min_seek; + int64_t end; ssize_t data_available; - uint64_t last_position; uint32_t ready; int done; @@ -386,8 +386,6 @@ do_extract (struct EXTRACTOR_PluginList *plugins, else ready = 0; - last_position = UINT64_MAX; - prp.file_finished = 0; prp.proc = proc; prp.proc_cls = proc_cls; @@ -397,7 +395,7 @@ do_extract (struct EXTRACTOR_PluginList *plugins, start.reserved = 0; start.reserved2 = 0; start.shm_ready_bytes = ready; - start.file_size = EXTRACTOR_datasource_get_size_ (ds); + start.file_size = EXTRACTOR_datasource_get_size_ (ds, 0); for (pos = plugins; NULL != pos; pos = pos->next) { if ( (NULL != pos->channel) && @@ -453,18 +451,37 @@ do_extract (struct EXTRACTOR_PluginList *plugins, if ( (1 == pos->round_finished) || (NULL == pos->channel) ) continue; /* inactive plugin */ - if (((last_position <= pos->seek_request) && - (last_position + data_available > pos->seek_request)) && - ((data_available > 0) || - (last_position == EXTRACTOR_datasource_get_size_ (ds)))) + if (-1 == pos->seek_request) { - done = 0; /* possibly more meta data at current position! */ + /* possibly more meta data at current position, at least + this plugin is still working on it... */ + done = 0; break; } - if ( (-1 == min_seek) || - (min_seek > pos->seek_request) ) + if (-1 != pos->seek_request) { - min_seek = pos->seek_request; + if (SEEK_END == pos->seek_whence) + { + /* convert distance from end to absolute position */ + pos->seek_whence = 0; + end = EXTRACTOR_datasource_get_size_ (ds, 1); + if (pos->seek_request > end) + { + LOG ("Cannot seek to before the beginning of the file!\n"); + pos->seek_request = 0; + } + else + { + pos->seek_request = end - pos->seek_request; + } + } + if ( (-1 == min_seek) || + (min_seek > pos->seek_request) ) + { + LOG ("Updating min seek to %llu\n", + (unsigned long long) pos->seek_request); + min_seek = pos->seek_request; + } } } if ( (1 == done) && @@ -482,6 +499,10 @@ do_extract (struct EXTRACTOR_PluginList *plugins, abort_all_channels (plugins); break; } + LOG ("Seeking to %lld, got %d bytes ready there\n", + (long long) min_seek, + (int) data_available); + } /* if 'prp.file_finished', send 'abort' to plugins; if not, send 'seek' notification to plugins in range */ @@ -496,23 +517,29 @@ do_extract (struct EXTRACTOR_PluginList *plugins, pos->round_finished = 1; pos->seek_request = -1; } - if ( (-1 != pos->seek_request) && ((last_position > pos->seek_request) || - (last_position + data_available <= pos->seek_request)) && + if ( (-1 != pos->seek_request) && (min_seek <= pos->seek_request) && - ((min_seek + data_available > pos->seek_request) || - (min_seek == EXTRACTOR_datasource_get_size_ (ds))) ) + ( (min_seek + data_available > pos->seek_request) || + (min_seek == EXTRACTOR_datasource_get_size_ (ds, 0))) ) { + LOG ("Notifying plugin about seek\n"); send_update_message (pos, min_seek, data_available, ds); pos->seek_request = -1; } + else + { + if (-1 != pos->seek_request) + LOG ("Skipping plugin, seek %lld not in range %llu-%llu\n", + (long long) pos->seek_request, + min_seek, + min_seek + data_available); + } if (0 == pos->round_finished) done = 0; /* can't be done, plugin still active */ } - if (min_seek >= 0) - last_position = min_seek; } /* run in-process plugins */ diff --git a/src/main/extractor_datasource.c b/src/main/extractor_datasource.c @@ -59,7 +59,8 @@ * Data is read from the source and shoved into decompressor * in chunks this big. */ -#define COM_CHUNK_SIZE (10 * 1024) +#define COM_CHUNK_SIZE (12345) +// #define COM_CHUNK_SIZE (10 * 1024) /** @@ -230,7 +231,10 @@ bfds_pick_next_buffer_at (struct BufferedFileDataSource *bfds, } position = (int64_t) LSEEK (bfds->fd, pos, SEEK_SET); if (position < 0) - return -1; + { + LOG_STRERROR ("lseek"); + return -1; + } bfds->fpos = position; bfds->buffer_pos = 0; rd = read (bfds->fd, bfds->buffer, bfds->buffer_size); @@ -374,18 +378,19 @@ bfds_seek (struct BufferedFileDataSource *bfds, return -1; } if ( (NULL == bfds->buffer) || - ( (bfds->buffer_pos <= pos) && - (bfds->buffer_pos + bfds->buffer_bytes > pos) ) ) + ( (bfds->buffer_pos + bfds->fpos <= pos) && + (bfds->buffer_pos + bfds->fpos + bfds->buffer_bytes > pos) ) ) { - bfds->buffer_pos = pos; - return bfds->buffer_pos; + bfds->buffer_pos = pos - bfds->fpos; + return pos; } if (0 != bfds_pick_next_buffer_at (bfds, pos)) { LOG ("seek operation failed\n"); return -1; } - return bfds->fpos; + ASSERT (pos == bfds->fpos + bfds->buffer_pos); + return pos; } return -1; } @@ -453,13 +458,9 @@ cfs_reset_stream_zlib (struct CompressedFileSource *cfs) { if (cfs->gzip_header_length != bfds_seek (cfs->bfds, cfs->gzip_header_length, SEEK_SET)) - return 0; - cfs->strm.next_in = NULL; - cfs->strm.avail_in = 0; - cfs->strm.total_in = 0; - cfs->strm.zalloc = NULL; - cfs->strm.zfree = NULL; - cfs->strm.opaque = NULL; + return -1; + memset (&cfs->strm, 0, sizeof (z_stream)); + cfs->strm.avail_out = COM_CHUNK_SIZE; /* * note: maybe plain inflateInit(&strm) is adequate, @@ -624,7 +625,6 @@ cfs_init_decompressor_zlib (struct CompressedFileSource *cfs, /* zlib will take care of its header */ gzip_header_length = 0; #endif - cfs->gzip_header_length = gzip_header_length; return cfs_reset_stream_zlib (cfs); } @@ -797,12 +797,15 @@ cfs_read_zlib (struct CompressedFileSource *cfs, char buf[COM_CHUNK_SIZE]; if (cfs->fpos == cfs->uncompressed_size) - return 0; + { + LOG ("fpos at EOF, returning 0 from cfs_read_zlib\n"); + return 0; + } rc = 0; - if (cfs->strm.avail_out > 0) + if (COM_CHUNK_SIZE > cfs->strm.avail_out + cfs->result_pos) { /* got left-over decompressed data from previous round! */ - in = cfs->strm.avail_out; + in = COM_CHUNK_SIZE - (cfs->strm.avail_out + cfs->result_pos); if (in > size) in = size; memcpy (&dst[rc], &cfs->result[cfs->result_pos], in); @@ -816,8 +819,11 @@ cfs_read_zlib (struct CompressedFileSource *cfs, /* read block from original data source */ in = bfds_read (cfs->bfds, buf, sizeof (buf)); - if (in <= 0) - return -1; /* unexpected EOF */ + if (in < 0) + { + LOG ("unexpected EOF\n"); + return -1; /* unexpected EOF */ + } cfs->strm.next_in = (unsigned char *) buf; cfs->strm.avail_in = (uInt) in; cfs->strm.next_out = (unsigned char *) cfs->result; @@ -825,12 +831,18 @@ cfs_read_zlib (struct CompressedFileSource *cfs, cfs->result_pos = 0; ret = inflate (&cfs->strm, Z_SYNC_FLUSH); if ( (Z_OK != ret) && (Z_STREAM_END != ret) ) - return -1; /* unexpected error */ + { + LOG ("unexpected gzip inflate error: %d\n", ret); + return -1; /* unexpected error */ + } /* go backwards by the number of bytes left in the buffer */ if (-1 == bfds_seek (cfs->bfds, - (int64_t) cfs->strm.avail_in, SEEK_CUR)) - return -1; + { + LOG ("seek failed\n"); + return -1; + } /* copy decompressed bytes to target buffer */ - in = cfs->strm.total_out; + in = COM_CHUNK_SIZE - cfs->strm.avail_out; if (in > size - rc) in = size - rc; memcpy (&dst[rc], &cfs->result[cfs->result_pos], in); @@ -840,6 +852,8 @@ cfs_read_zlib (struct CompressedFileSource *cfs, } if (Z_STREAM_END == ret) cfs->uncompressed_size = cfs->fpos; + LOG ("returning %d from cfs_read_zlib\n", + (int) rc); return rc; } @@ -963,7 +977,6 @@ cfs_seek (struct CompressedFileSource *cfs, LOG ("Invalid seek operation\n"); return -1; } - delta = nposition - cfs->fpos; if (delta < 0) { @@ -1253,15 +1266,36 @@ EXTRACTOR_datasource_seek_ (void *cls, * Determine the overall size of the data source (after compression). * * @param cls must be a 'struct EXTRACTOR_Datasource' + * @param force force computing the size if it is unavailable * @return overall file size, UINT64_MAX on error or unknown */ int64_t -EXTRACTOR_datasource_get_size_ (void *cls) +EXTRACTOR_datasource_get_size_ (void *cls, + int force) { struct EXTRACTOR_Datasource *ds = cls; + char buf[32 * 1024]; + uint64_t pos; if (NULL != ds->cfs) - return ds->cfs->uncompressed_size; + { + if ( (force) && + (-1 == ds->cfs->uncompressed_size) ) + { + pos = ds->cfs->fpos; + LOG ("seeking to end\n"); + while ( (-1 == ds->cfs->uncompressed_size) && + (-1 != cfs_read (ds->cfs, buf, sizeof (buf))) ) ; + if (-1 == cfs_seek (ds->cfs, pos, SEEK_SET)) + { + LOG ("Serious problem, I moved the buffer to determine the file size but could not restore it...\n"); + return -1; + } + LOG ("File size is %llu bytes decompressed\n", + (unsigned long long) ds->cfs->uncompressed_size); + } + return ds->cfs->uncompressed_size; + } return ds->bfds->fsize; } diff --git a/src/main/extractor_datasource.h b/src/main/extractor_datasource.h @@ -105,10 +105,12 @@ EXTRACTOR_datasource_seek_ (void *cls, * Determine the overall size of the data source (after compression). * * @param cls must be a 'struct EXTRACTOR_Datasource' + * @param force force computing the size if it is unavailable * @return overall file size, -1 on error or unknown */ int64_t -EXTRACTOR_datasource_get_size_ (void *cls); +EXTRACTOR_datasource_get_size_ (void *cls, + int force); #endif diff --git a/src/main/extractor_ipc.c b/src/main/extractor_ipc.c @@ -68,7 +68,11 @@ EXTRACTOR_IPC_process_reply_ (struct EXTRACTOR_PluginList *plugin, return 0; } memcpy (&seek, cdata, sizeof (seek)); - plugin->seek_request = seek.file_offset; + plugin->seek_request = (int64_t) seek.file_offset; + plugin->seek_whence = seek.whence; + LOG ("Received %d-seek request to %llu\n", + (int) seek.whence, + (unsigned long long) seek.file_offset); return sizeof (struct SeekRequestMessage); case MESSAGE_META: /* Meta */ if (size < sizeof (struct MetaMessage)) diff --git a/src/main/extractor_ipc.h b/src/main/extractor_ipc.h @@ -231,9 +231,11 @@ struct SeekRequestMessage unsigned char reserved; /** - * Always zero. + * 'whence' value for the seek operation; + * 0 = SEEK_SET, 1 = SEEK_CUR, 2 = SEEK_END. + * Note that 'SEEK_CUR' is never used here. */ - uint16_t reserved2; + uint16_t whence; /** * Number of bytes requested for SHM. @@ -241,7 +243,10 @@ struct SeekRequestMessage uint32_t requested_bytes; /** - * Requested offset. + * Requested offset; a positive value from the end of the + * file is used of 'whence' is SEEK_END; a postive value + * from the start is used of 'whence' is SEEK_SET. + * 'SEEK_CUR' is never used. */ uint64_t file_offset; diff --git a/src/main/extractor_ipc_gnu.c b/src/main/extractor_ipc_gnu.c @@ -228,7 +228,10 @@ EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm, { if (-1 == EXTRACTOR_datasource_seek_ (ds, off, SEEK_SET)) - return -1; + { + LOG ("Failed to set IPC memory due to seek error\n"); + return -1; + } if (size > shm->shm_size) size = shm->shm_size; return EXTRACTOR_datasource_read_ (ds, diff --git a/src/main/extractor_plugin_main.c b/src/main/extractor_plugin_main.c @@ -117,6 +117,7 @@ plugin_env_seek (void *cls, struct UpdateMessage um; uint64_t npos; unsigned char reply; + uint16_t wval; switch (whence) { @@ -133,6 +134,7 @@ plugin_env_seek (void *cls, return -1; } npos = (uint64_t) (pc->read_position + pos); + wval = 0; break; case SEEK_END: if (pos > 0) @@ -140,6 +142,12 @@ plugin_env_seek (void *cls, LOG ("Invalid seek operation\n"); return -1; } + if (UINT64_MAX == pc->file_size) + { + wval = 2; + npos = (uint64_t) - pos; + break; + } pos = (int64_t) (pc->file_size + pos); /* fall-through! */ case SEEK_SET: @@ -149,13 +157,15 @@ plugin_env_seek (void *cls, return -1; } npos = (uint64_t) pos; + wval = 0; break; default: LOG ("Invalid seek operation\n"); return -1; } if ( (pc->shm_off <= npos) && - (pc->shm_off + pc->shm_ready_bytes > npos) ) + (pc->shm_off + pc->shm_ready_bytes > npos) && + (0 == wval) ) { pc->read_position = npos; return (int64_t) npos; @@ -163,10 +173,17 @@ plugin_env_seek (void *cls, /* need to seek */ srm.opcode = MESSAGE_SEEK; srm.reserved = 0; - srm.reserved2 = 0; + srm.whence = wval; srm.requested_bytes = pc->shm_map_size; - if (srm.requested_bytes > pc->file_size - npos) - srm.requested_bytes = pc->file_size - npos; + if (0 == wval) + { + if (srm.requested_bytes > pc->file_size - npos) + srm.requested_bytes = pc->file_size - npos; + } + else + { + srm.requested_bytes = npos; + } srm.file_offset = npos; if (-1 == EXTRACTOR_write_all_ (pc->out, &srm, sizeof (srm))) { @@ -181,7 +198,10 @@ plugin_env_seek (void *cls, return -1; } if (MESSAGE_UPDATED_SHM != reply) - return -1; /* was likely a MESSAGE_DISCARD_STATE */ + { + LOG ("Unexpected reply %d to seek\n", reply); + return -1; /* was likely a MESSAGE_DISCARD_STATE */ + } if (-1 == EXTRACTOR_read_all_ (pc->in, &um.reserved, sizeof (um) - 1)) { LOG ("Failed to read MESSAGE_UPDATED_SHM\n"); @@ -190,6 +210,11 @@ plugin_env_seek (void *cls, pc->shm_off = um.shm_off; pc->shm_ready_bytes = um.shm_ready_bytes; pc->file_size = um.file_size; + if (2 == wval) + { + /* convert offset to be absolute from beginning of the file */ + npos = pc->file_size - npos; + } if ( (pc->shm_off <= npos) && ((pc->shm_off + pc->shm_ready_bytes > npos) || (pc->file_size == pc->shm_off)) ) @@ -199,7 +224,11 @@ plugin_env_seek (void *cls, } /* oops, serious missunderstanding, we asked to seek and then were notified about a different position!? */ - LOG ("Got invalid MESSAGE_UPDATED_SHM in response to my seek\n"); + LOG ("Got invalid MESSAGE_UPDATED_SHM in response to my %d-seek (%llu not in %llu-%llu)\n", + (int) wval, + (unsigned long long) npos, + (unsigned long long) pc->shm_off, + (unsigned long long) pc->shm_off + pc->shm_ready_bytes); return -1; } diff --git a/src/main/extractor_plugins.h b/src/main/extractor_plugins.h @@ -89,7 +89,9 @@ struct EXTRACTOR_PluginList /** * A position this plugin wants us to seek to. -1 if it's finished. - * Starts at 0. + * A positive value from the end of the file is used of 'whence' is + * SEEK_END; a postive value from the start is used of 'whence' is + * SEEK_SET. 'SEEK_CUR' is never used. */ int64_t seek_request; @@ -104,6 +106,13 @@ struct EXTRACTOR_PluginList */ int round_finished; + /** + * 'whence' value for the seek operation; + * 0 = SEEK_SET, 1 = SEEK_CUR, 2 = SEEK_END. + * Note that 'SEEK_CUR' is never used here. + */ + uint16_t seek_whence; + }; diff --git a/src/main/test_extractor.c b/src/main/test_extractor.c @@ -67,7 +67,6 @@ EXTRACTOR_test_extract_method (struct EXTRACTOR_ExtractContext *ec) fprintf (stderr, "Unexpected file size returned (expected 150k)\n"); abort (); } - if (1024 * 100 + 4 != ec->seek (ec->cls, 1024 * 100 + 4, SEEK_SET)) { fprintf (stderr, "Failure to seek (SEEK_SET)\n"); @@ -83,7 +82,6 @@ EXTRACTOR_test_extract_method (struct EXTRACTOR_ExtractContext *ec) fprintf (stderr, "Unexpected data at offset 100k + 4\n"); abort (); } - if (((1024 * 100 + 4) + 1 - (1024 * 50 + 7)) != ec->seek (ec->cls, - (1024 * 50 + 7), SEEK_CUR)) { @@ -97,10 +95,10 @@ EXTRACTOR_test_extract_method (struct EXTRACTOR_ExtractContext *ec) } if (((1024 * 100 + 4) + 1 - (1024 * 50 + 7)) % 256 != * (unsigned char *) dp) { - fprintf (stderr, "Unexpected data at offset 50k - 3\n"); + fprintf (stderr, "Unexpected data at offset 100k - 3\n"); abort (); } - + fprintf (stderr, "Seeking to 150k\n"); if (1024 * 150 != ec->seek (ec->cls, 0, SEEK_END)) { fprintf (stderr, "Failure to seek (SEEK_END)\n"); @@ -111,12 +109,12 @@ EXTRACTOR_test_extract_method (struct EXTRACTOR_ExtractContext *ec) fprintf (stderr, "Failed to receive EOF at 150k\n"); abort (); } - if (1024 * 150 - 2 != ec->seek (ec->cls, -2, SEEK_END)) { fprintf (stderr, "Failure to seek (SEEK_END - 2)\n"); abort (); } + fprintf (stderr, "Reading at 150k - 2\n"); if (1 != ec->read (ec->cls, &dp, 1)) { fprintf (stderr, "Failure to read at 150k - 3\n"); @@ -127,7 +125,7 @@ EXTRACTOR_test_extract_method (struct EXTRACTOR_ExtractContext *ec) fprintf (stderr, "Unexpected data at offset 150k - 3\n"); abort (); } - + fprintf (stderr, "Good at 150k\n"); if (0 != ec->proc (ec->cls, "test", EXTRACTOR_METATYPE_COMMENT, EXTRACTOR_METAFORMAT_UTF8, "<no mime>", "Hello world!", strlen ("Hello world!") + 1))