aboutsummaryrefslogtreecommitdiff
path: root/src/main/extractor.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/extractor.c')
-rw-r--r--src/main/extractor.c218
1 files changed, 173 insertions, 45 deletions
diff --git a/src/main/extractor.c b/src/main/extractor.c
index 2019fdd..29294a2 100644
--- a/src/main/extractor.c
+++ b/src/main/extractor.c
@@ -102,9 +102,58 @@ struct PluginReplyProcessor
102 */ 102 */
103 void *proc_cls; 103 void *proc_cls;
104 104
105 /**
106 * Are we done with processing this file? 0 to continue, 1 to terminate.
107 */
108 int file_finished;
109
105}; 110};
106 111
107 112
113
114/**
115 * Send a 'discard state' message to the plugin and mark it as finished
116 * for this round.
117 *
118 * @param plugin plugin to notify
119 */
120static void
121send_discard_message (struct EXTRACTOR_PluginList *plugin)
122{
123 static unsigned char disc_msg = MESSAGE_DISCARD_STATE;
124
125 if (sizeof (disc_msg) !=
126 EXTRACTOR_IPC_channel_send_ (plugin->channel,
127 &disc_msg,
128 sizeof (disc_msg)) )
129 {
130 EXTRACTOR_IPC_channel_destroy_ (plugin->channel);
131 plugin->channel = NULL;
132 plugin->round_finished = 1;
133 }
134}
135
136
137/**
138 * We had some serious trouble. Abort all channels.
139 *
140 * @param plugins list of plugins with channels to abort
141 */
142static void
143abort_all_channels (struct EXTRACTOR_PluginList *plugins)
144{
145 struct EXTRACTOR_PluginList *pos;
146
147 for (pos = plugins; NULL != pos; pos = pos->next)
148 {
149 if (NULL == pos->channel)
150 continue;
151 EXTRACTOR_IPC_channel_destroy_ (pos->channel);
152 pos->channel = NULL;
153 }
154}
155
156
108/** 157/**
109 * Handler for a message from one of the plugins. 158 * Handler for a message from one of the plugins.
110 * 159 *
@@ -125,9 +174,35 @@ process_plugin_reply (void *cls,
125 const void *value, 174 const void *value,
126 const char *mime) 175 const char *mime)
127{ 176{
128 // struct PluginReplyProcessor *prp = cls; 177 static unsigned char cont_msg = MESSAGE_CONTINUE_EXTRACTING;
178 struct PluginReplyProcessor *prp = cls;
129 179
130 // FIXME... 180 if (0 != prp->file_finished)
181 {
182 /* client already aborted, ignore message, tell plugin about abort */
183 return;
184 }
185 if (0 != prp->proc (prp->proc_cls,
186 plugin->short_libname,
187 meta_type,
188 meta_format,
189 mime,
190 value,
191 value_len))
192 {
193 prp->file_finished = 1;
194 send_discard_message (plugin);
195 return;
196 }
197 if (sizeof (cont_msg) !=
198 EXTRACTOR_IPC_channel_send_ (plugin->channel,
199 &cont_msg,
200 sizeof (cont_msg)) )
201 {
202 EXTRACTOR_IPC_channel_destroy_ (plugin->channel);
203 plugin->channel = NULL;
204 plugin->round_finished = 1;
205 }
131} 206}
132 207
133 208
@@ -152,6 +227,8 @@ do_extract (struct EXTRACTOR_PluginList *plugins,
152 struct StartMessage start; 227 struct StartMessage start;
153 struct EXTRACTOR_Channel *channel; 228 struct EXTRACTOR_Channel *channel;
154 struct PluginReplyProcessor prp; 229 struct PluginReplyProcessor prp;
230 int64_t min_seek;
231 ssize_t data_available;
155 uint32_t ready; 232 uint32_t ready;
156 int done; 233 int done;
157 234
@@ -163,6 +240,7 @@ do_extract (struct EXTRACTOR_PluginList *plugins,
163 else 240 else
164 ready = 0; 241 ready = 0;
165 242
243 prp.file_finished = 0;
166 prp.proc = proc; 244 prp.proc = proc;
167 prp.proc_cls = proc_cls; 245 prp.proc_cls = proc_cls;
168 246
@@ -171,50 +249,100 @@ do_extract (struct EXTRACTOR_PluginList *plugins,
171 start.reserved = 0; 249 start.reserved = 0;
172 start.reserved2 = 0; 250 start.reserved2 = 0;
173 start.shm_ready_bytes = ready; 251 start.shm_ready_bytes = ready;
174 start.file_size = EXTRACTOR_datasource_get_size_ (ds); 252 start.file_size = EXTRACTOR_datasource_get_size_ (ds);
175 { 253 for (pos = plugins; NULL != pos; pos = pos->next)
176 struct EXTRACTOR_Channel *channels[plugin_count]; 254 {
177 255 if ( (NULL != pos->channel) &&
178 plugin_count = 0; 256 (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
179 for (pos = plugins; NULL != pos; pos = pos->next) 257 &start,
180 { 258 sizeof (start)) ) )
181 channels[plugin_count] = pos->channel; 259 {
182 if ( (NULL != pos->channel) && 260 EXTRACTOR_IPC_channel_destroy_ (pos->channel);
183 (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel, 261 pos->channel = NULL;
184 &start, 262 }
185 sizeof (start)) ) ) 263 }
186 { 264 done = 0;
187 channels[plugin_count] = NULL; 265 while (! done)
188 EXTRACTOR_IPC_channel_destroy_ (pos->channel); 266 {
189 pos->channel = NULL; 267 struct EXTRACTOR_Channel *channels[plugin_count];
190 } 268
191 plugin_count++; 269 /* calculate current 'channels' array */
192 } 270 plugin_count = 0;
193 done = 0; 271 for (pos = plugins; NULL != pos; pos = pos->next)
194 while (! done) 272 {
195 { 273 channels[plugin_count] = pos->channel;
196 done = 1; 274 plugin_count++;
197 275 }
198 // FIXME: need to handle 'seek' messages from plugins somewhere 276
199 if (-1 == 277 /* give plugins chance to send us meta data, seek or finished messages */
200 EXTRACTOR_IPC_channel_recv_ (channels, 278 if (-1 ==
201 plugin_count, 279 EXTRACTOR_IPC_channel_recv_ (channels,
202 &process_plugin_reply, 280 plugin_count,
203 &prp)) 281 &process_plugin_reply,
282 &prp))
283 {
284 /* serious problem in IPC; reset *all* channels */
285 abort_all_channels (plugins);
204 break; 286 break;
205 plugin_count = 0; 287 }
206 for (pos = plugins; NULL != pos; pos = pos->next) 288 /* calculate minimum seek request (or set done=0 to continue here) */
207 { 289 done = 1;
208 if (NULL != (channel = channels[plugin_count])) 290 min_seek = -1;
209 { 291 for (pos = plugins; NULL != pos; pos = pos->next)
210 // ... FIXME ... 292 {
211 } 293 if ( (1 == pos->round_finished) ||
212 plugin_count++; 294 (NULL == pos->channel) )
213 } 295 continue; /* inactive plugin */
214 // FIXME: need to terminate once all plugins are done... 296 if (-1 == pos->seek_request)
215 done = 0; 297 {
216 } 298 done = 0; /* possibly more meta data at current position! */
217 } 299 break;
300 }
301 if ( (-1 == min_seek) ||
302 (min_seek > pos->seek_request) )
303 {
304 min_seek = pos->seek_request;
305 }
306 }
307 if ( (1 == done) &&
308 (-1 != min_seek) )
309 {
310 /* current position done, but seek requested */
311 done = 0;
312 if (-1 ==
313 (data_available = EXTRACTOR_IPC_shared_memory_set_ (shm,
314 ds,
315 min_seek,
316 DEFAULT_SHM_SIZE)))
317 {
318 abort_all_channels (plugins);
319 break;
320 }
321 }
322 /* if 'prp.file_finished', send 'abort' to plugins;
323 if not, send 'seek' notification to plugins in range */
324 for (pos = plugins; NULL != pos; pos = pos->next)
325 {
326 if (NULL == (channel = channels[plugin_count]))
327 continue;
328 if ( (-1 != pos->seek_request) &&
329 (min_seek <= pos->seek_request) &&
330 (min_seek + data_available > pos->seek_request) )
331 {
332
333 /* FIXME: notify plugin about seek! */
334 pos->seek_request = -1;
335 }
336 if ( (-1 != pos->seek_request) &&
337 (1 == prp.file_finished) )
338 {
339 send_discard_message (pos);
340 pos->round_finished = 1;
341 }
342 if (0 == pos->round_finished)
343 done = 0; /* can't be done, plugin still active */
344 }
345 }
218 346
219 /* run in-process plugins */ 347 /* run in-process plugins */
220 for (pos = plugins; NULL != pos; pos = pos->next) 348 for (pos = plugins; NULL != pos; pos = pos->next)