commit 29dbb0e97a84fcaede9d333450053e61f4a24a1b
parent 832c37b6370780395554aebef1384f11594bc98f
Author: Christian Grothoff <christian@grothoff.org>
Date: Sun, 29 Jul 2012 21:49:48 +0000
-towards a theoretically working implementation...
Diffstat:
| M | src/main/extractor.c | | | 218 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------- |
1 file changed, 173 insertions(+), 45 deletions(-)
diff --git a/src/main/extractor.c b/src/main/extractor.c
@@ -102,9 +102,58 @@ struct PluginReplyProcessor
*/
void *proc_cls;
+ /**
+ * Are we done with processing this file? 0 to continue, 1 to terminate.
+ */
+ int file_finished;
+
};
+
+/**
+ * Send a 'discard state' message to the plugin and mark it as finished
+ * for this round.
+ *
+ * @param plugin plugin to notify
+ */
+static void
+send_discard_message (struct EXTRACTOR_PluginList *plugin)
+{
+ static unsigned char disc_msg = MESSAGE_DISCARD_STATE;
+
+ if (sizeof (disc_msg) !=
+ EXTRACTOR_IPC_channel_send_ (plugin->channel,
+ &disc_msg,
+ sizeof (disc_msg)) )
+ {
+ EXTRACTOR_IPC_channel_destroy_ (plugin->channel);
+ plugin->channel = NULL;
+ plugin->round_finished = 1;
+ }
+}
+
+
+/**
+ * We had some serious trouble. Abort all channels.
+ *
+ * @param plugins list of plugins with channels to abort
+ */
+static void
+abort_all_channels (struct EXTRACTOR_PluginList *plugins)
+{
+ struct EXTRACTOR_PluginList *pos;
+
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ if (NULL == pos->channel)
+ continue;
+ EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+ pos->channel = NULL;
+ }
+}
+
+
/**
* Handler for a message from one of the plugins.
*
@@ -125,9 +174,35 @@ process_plugin_reply (void *cls,
const void *value,
const char *mime)
{
- // struct PluginReplyProcessor *prp = cls;
+ static unsigned char cont_msg = MESSAGE_CONTINUE_EXTRACTING;
+ struct PluginReplyProcessor *prp = cls;
- // FIXME...
+ if (0 != prp->file_finished)
+ {
+ /* client already aborted, ignore message, tell plugin about abort */
+ return;
+ }
+ if (0 != prp->proc (prp->proc_cls,
+ plugin->short_libname,
+ meta_type,
+ meta_format,
+ mime,
+ value,
+ value_len))
+ {
+ prp->file_finished = 1;
+ send_discard_message (plugin);
+ return;
+ }
+ if (sizeof (cont_msg) !=
+ EXTRACTOR_IPC_channel_send_ (plugin->channel,
+ &cont_msg,
+ sizeof (cont_msg)) )
+ {
+ EXTRACTOR_IPC_channel_destroy_ (plugin->channel);
+ plugin->channel = NULL;
+ plugin->round_finished = 1;
+ }
}
@@ -152,6 +227,8 @@ do_extract (struct EXTRACTOR_PluginList *plugins,
struct StartMessage start;
struct EXTRACTOR_Channel *channel;
struct PluginReplyProcessor prp;
+ int64_t min_seek;
+ ssize_t data_available;
uint32_t ready;
int done;
@@ -163,6 +240,7 @@ do_extract (struct EXTRACTOR_PluginList *plugins,
else
ready = 0;
+ prp.file_finished = 0;
prp.proc = proc;
prp.proc_cls = proc_cls;
@@ -171,50 +249,100 @@ do_extract (struct EXTRACTOR_PluginList *plugins,
start.reserved = 0;
start.reserved2 = 0;
start.shm_ready_bytes = ready;
- start.file_size = EXTRACTOR_datasource_get_size_ (ds);
- {
- struct EXTRACTOR_Channel *channels[plugin_count];
-
- plugin_count = 0;
- for (pos = plugins; NULL != pos; pos = pos->next)
- {
- channels[plugin_count] = pos->channel;
- if ( (NULL != pos->channel) &&
- (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
- &start,
- sizeof (start)) ) )
- {
- channels[plugin_count] = NULL;
- EXTRACTOR_IPC_channel_destroy_ (pos->channel);
- pos->channel = NULL;
- }
- plugin_count++;
- }
- done = 0;
- while (! done)
- {
- done = 1;
-
- // FIXME: need to handle 'seek' messages from plugins somewhere
- if (-1 ==
- EXTRACTOR_IPC_channel_recv_ (channels,
- plugin_count,
- &process_plugin_reply,
- &prp))
+ start.file_size = EXTRACTOR_datasource_get_size_ (ds);
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ if ( (NULL != pos->channel) &&
+ (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
+ &start,
+ sizeof (start)) ) )
+ {
+ EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+ pos->channel = NULL;
+ }
+ }
+ done = 0;
+ while (! done)
+ {
+ struct EXTRACTOR_Channel *channels[plugin_count];
+
+ /* calculate current 'channels' array */
+ plugin_count = 0;
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ channels[plugin_count] = pos->channel;
+ plugin_count++;
+ }
+
+ /* give plugins chance to send us meta data, seek or finished messages */
+ if (-1 ==
+ EXTRACTOR_IPC_channel_recv_ (channels,
+ plugin_count,
+ &process_plugin_reply,
+ &prp))
+ {
+ /* serious problem in IPC; reset *all* channels */
+ abort_all_channels (plugins);
break;
- plugin_count = 0;
- for (pos = plugins; NULL != pos; pos = pos->next)
- {
- if (NULL != (channel = channels[plugin_count]))
- {
- // ... FIXME ...
- }
- plugin_count++;
- }
- // FIXME: need to terminate once all plugins are done...
- done = 0;
- }
- }
+ }
+ /* calculate minimum seek request (or set done=0 to continue here) */
+ done = 1;
+ min_seek = -1;
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ if ( (1 == pos->round_finished) ||
+ (NULL == pos->channel) )
+ continue; /* inactive plugin */
+ if (-1 == pos->seek_request)
+ {
+ done = 0; /* possibly more meta data at current position! */
+ break;
+ }
+ if ( (-1 == min_seek) ||
+ (min_seek > pos->seek_request) )
+ {
+ min_seek = pos->seek_request;
+ }
+ }
+ if ( (1 == done) &&
+ (-1 != min_seek) )
+ {
+ /* current position done, but seek requested */
+ done = 0;
+ if (-1 ==
+ (data_available = EXTRACTOR_IPC_shared_memory_set_ (shm,
+ ds,
+ min_seek,
+ DEFAULT_SHM_SIZE)))
+ {
+ abort_all_channels (plugins);
+ break;
+ }
+ }
+ /* if 'prp.file_finished', send 'abort' to plugins;
+ if not, send 'seek' notification to plugins in range */
+ for (pos = plugins; NULL != pos; pos = pos->next)
+ {
+ if (NULL == (channel = channels[plugin_count]))
+ continue;
+ if ( (-1 != pos->seek_request) &&
+ (min_seek <= pos->seek_request) &&
+ (min_seek + data_available > pos->seek_request) )
+ {
+
+ /* FIXME: notify plugin about seek! */
+ pos->seek_request = -1;
+ }
+ if ( (-1 != pos->seek_request) &&
+ (1 == prp.file_finished) )
+ {
+ send_discard_message (pos);
+ pos->round_finished = 1;
+ }
+ if (0 == pos->round_finished)
+ done = 0; /* can't be done, plugin still active */
+ }
+ }
/* run in-process plugins */
for (pos = plugins; NULL != pos; pos = pos->next)