aboutsummaryrefslogtreecommitdiff
path: root/src/main/extractor_ipc_w32.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/extractor_ipc_w32.c')
-rw-r--r--src/main/extractor_ipc_w32.c761
1 files changed, 309 insertions, 452 deletions
diff --git a/src/main/extractor_ipc_w32.c b/src/main/extractor_ipc_w32.c
index cb37e3e..6a7a2cd 100644
--- a/src/main/extractor_ipc_w32.c
+++ b/src/main/extractor_ipc_w32.c
@@ -23,7 +23,9 @@
23#include "extractor.h" 23#include "extractor.h"
24#include "extractor_datasource.h" 24#include "extractor_datasource.h"
25#include "extractor_plugin_main.h" 25#include "extractor_plugin_main.h"
26#include "extractor_plugins.h"
26#include "extractor_ipc.h" 27#include "extractor_ipc.h"
28#include "extractor_logging.h"
27 29
28/** 30/**
29 */ 31 */
@@ -33,7 +35,7 @@ struct EXTRACTOR_SharedMemory
33 /** 35 /**
34 * W32 handle of the shm into which data is uncompressed 36 * W32 handle of the shm into which data is uncompressed
35 */ 37 */
36 HANDLE shm; 38 HANDLE map;
37 39
38 /** 40 /**
39 * Name of the shm 41 * Name of the shm
@@ -43,12 +45,12 @@ struct EXTRACTOR_SharedMemory
43 /** 45 /**
44 * Pointer to the mapped region of the shm (covers the whole shm) 46 * Pointer to the mapped region of the shm (covers the whole shm)
45 */ 47 */
46 void *shm_ptr; 48 void *ptr;
47 49
48 /** 50 /**
49 * Position within shm 51 * Position within shm
50 */ 52 */
51 int64_t shm_pos; 53 int64_t pos;
52 54
53 /** 55 /**
54 * Allocated size of the shm 56 * Allocated size of the shm
@@ -60,7 +62,12 @@ struct EXTRACTOR_SharedMemory
60 */ 62 */
61 size_t shm_buf_size; 63 size_t shm_buf_size;
62 64
65 size_t shm_map_size;
63 66
67 /**
68 * Reference counter describing how many references share this SHM.
69 */
70 unsigned int rc;
64}; 71};
65 72
66 73
@@ -113,125 +120,135 @@ struct EXTRACTOR_Channel
113 */ 120 */
114 unsigned char *ov_write_buffer; 121 unsigned char *ov_write_buffer;
115 122
123 /**
124 * The plugin this channel is to communicate with.
125 */
126 struct EXTRACTOR_PluginList *plugin;
127
128 /**
129 * Memory segment shared with this process.
130 */
131 struct EXTRACTOR_SharedMemory *shm;
132
133 void *old_buf;
134
135 /**
136 * Buffer for reading data from the plugin.
137 * FIXME: we might want to grow this
138 * buffer dynamically instead of always using 32 MB!
139 */
140 char data[MAX_META_DATA];
141
142 /**
143 * Number of valid bytes in the channel's buffer.
144 */
145 size_t size;
116}; 146};
117 147
118 148
119/** 149/**
120 * Initializes an extracting session for a plugin. 150 * Create a shared memory area.
121 * opens the file/shm (only in OPMODE_FILE)
122 * sets shm_ptr to NULL (unmaps it, if it was mapped)
123 * sets position to 0
124 * initializes file size to 'fsize' (may be -1)
125 * sets seek request to 0
126 * 151 *
127 * @param plugin plugin context 152 * @param size size of the shared area
128 * @param operation_mode the mode of operation (OPMODE_*) 153 * @return NULL on error
129 * @param fsize size of the source file (may be -1) 154 */
130 * @param shm_name name of the shm or file to open 155struct EXTRACTOR_SharedMemory *
131 * @return 0 on success, non-0 on error. 156EXTRACTOR_IPC_shared_memory_create_ (size_t size)
132 */
133static int
134init_state_method (struct EXTRACTOR_PluginList *plugin,
135 uint8_t operation_mode,
136 int64_t fsize,
137 const char *shm_name)
138{ 157{
139 plugin->seek_request = 0; 158 struct EXTRACTOR_SharedMemory *shm;
140 if (plugin->shm_ptr != NULL) 159 const char *tpath = "Local\\";
141 UnmapViewOfFile (plugin->shm_ptr); 160
142 plugin->shm_ptr = NULL; 161 if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory))))
143 if (INVALID_HANDLE_VALUE == plugin_open_shm (plugin, shm_name)) 162 return NULL;
144 return 1; 163
145 plugin->fsize = fsize; 164 snprintf (shm->shm_name, MAX_SHM_NAME,
146 plugin->shm_pos = 0; 165 "%slibextractor-shm-%u-%u",
147 plugin->fpos = 0; 166 tpath, getpid(),
148 return 0; 167 (unsigned int) RANDOM());
168 shm->map = CreateFileMapping (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, size, shm->shm_name);
169 shm->ptr = MapViewOfFile (shm->map, FILE_MAP_WRITE, 0, 0, size);
170 if (shm->ptr == NULL)
171 {
172 CloseHandle (shm->map);
173 free (shm);
174 return NULL;
175 }
176 shm->shm_size = size;
177 shm->rc = 0;
178 return shm;
149} 179}
150 180
151/** 181/**
152 * Opens a shared memory object (for later mmapping). 182 * Change the reference counter for this shm instance.
153 * This is W32 variant of the plugin_open_* function.
154 * Opened shm might be memory-backed or file-backed (depending on how
155 * it was created). shm_name is never a file name, unlike POSIX.
156 * Closes a shm is already opened, closes it before opening a new one.
157 * 183 *
158 * @param plugin plugin context 184 * @param shm instance to update
159 * @param shm_name name of the shared memory object. 185 * @param delta value to change RC by
160 * @return memory-mapped file handle (NULL on error). That is, the result of OpenFileMapping() syscall. 186 * @return new RC
161 */ 187 */
162static HANDLE 188unsigned int
163plugin_open_shm (struct EXTRACTOR_PluginList *plugin, 189EXTRACTOR_IPC_shared_memory_change_rc_ (struct EXTRACTOR_SharedMemory *shm,
164 const char *shm_name) 190 int delta)
165{ 191{
166 if (plugin->map_handle != 0) 192 shm->rc += delta;
167 CloseHandle (plugin->map_handle); 193 return shm->rc;
168 plugin->map_handle = OpenFileMapping (FILE_MAP_READ, FALSE, shm_name);
169 return plugin->map_handle;
170} 194}
171 195
172 196
173/** 197/**
174 * Another name for plugin_open_shm(). 198 * Destroy shared memory area.
175 */ 199 *
176static HANDLE 200 * @param shm memory area to destroy
177plugin_open_file (struct EXTRACTOR_PluginList *plugin, 201 * @return NULL on error
178 const char *shm_name) 202 */
179{ 203void
180 return plugin_open_shm (plugin, shm_name); 204EXTRACTOR_IPC_shared_memory_destroy_ (struct EXTRACTOR_SharedMemory *shm)
205{
206 if (shm->ptr != NULL)
207 UnmapViewOfFile (shm->ptr);
208 if (shm->map != 0)
209 CloseHandle (shm->map);
210 free (shm);
181} 211}
182 212
183
184/** 213/**
185 * Initializes an extracting session for a plugin. 214 * Initialize shared memory area from data source.
186 * opens the file/shm (only in OPMODE_FILE)
187 * sets shm_ptr to NULL (unmaps it, if it was mapped)
188 * sets position to 0
189 * initializes file size to 'fsize' (may be -1)
190 * sets seek request to 0
191 * 215 *
192 * @param plugin plugin context 216 * @param shm memory area to initialize
193 * @param operation_mode the mode of operation (OPMODE_*) 217 * @param ds data source to use for initialization
194 * @param fsize size of the source file (may be -1) 218 * @param off offset to use in data source
195 * @param shm_name name of the shm or file to open 219 * @param size number of bytes to copy
196 * @return 0 on success, non-0 on error. 220 * @return -1 on error, otherwise number of bytes copied
197 */ 221 */
198static int 222ssize_t
199init_state_method (struct EXTRACTOR_PluginList *plugin, 223EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm,
200 uint8_t operation_mode, 224 struct EXTRACTOR_Datasource *ds,
201 int64_t fsize, 225 uint64_t off,
202 const char *shm_name) 226 size_t size)
203{ 227{
204 plugin->seek_request = 0; 228 if (-1 ==
205 if (plugin->shm_ptr != NULL) 229 EXTRACTOR_datasource_seek_ (ds, off, SEEK_SET))
206 UnmapViewOfFile (plugin->shm_ptr); 230 return -1;
207 plugin->shm_ptr = NULL; 231 if (size > shm->shm_size)
208 if (INVALID_HANDLE_VALUE == plugin_open_shm (plugin, shm_name)) 232 size = shm->shm_size;
209 return 1; 233 return EXTRACTOR_datasource_read_ (ds,
210 plugin->fsize = fsize; 234 shm->ptr,
211 plugin->shm_pos = 0; 235 size);
212 plugin->fpos = 0;
213 return 0;
214} 236}
215 237
216 238
217/** 239/**
218 * Deinitializes an extracting session for a plugin. 240 * Query datasource for current position
219 * unmaps shm_ptr (if was mapped)
220 * closes file/shm (if it was opened)
221 * sets map size and shm_ptr to NULL.
222 * 241 *
223 * @param plugin plugin context 242 * @param ds data source to query
224 */ 243 * @return current position in the datasource or UINT_MAX on error
225static void 244 */
226discard_state_method (struct EXTRACTOR_PluginList *plugin) 245uint64_t
246EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds)
227{ 247{
228 if (plugin->shm_ptr != NULL) 248 int64_t pos = EXTRACTOR_datasource_seek_ (ds, 0, SEEK_CUR);
229 UnmapViewOfFile (plugin->shm_ptr); 249 if (-1 == pos)
230 if (plugin->map_handle != 0) 250 return UINT_MAX;
231 CloseHandle (plugin->map_handle); 251 return pos;
232 plugin->map_handle = 0;
233 plugin->map_size = 0;
234 plugin->shm_ptr = NULL;
235} 252}
236 253
237 254
@@ -341,80 +358,6 @@ create_selectable_pipe (PHANDLE read_pipe_ptr, PHANDLE write_pipe_ptr,
341 return 0; 358 return 0;
342} 359}
343 360
344
345/**
346 * Writes @size bytes from @buf to @h, using @ov for
347 * overlapped i/o. Deallocates @old_buf and sets it to NULL,
348 * if necessary.
349 * Writes asynchronously, but sequentially (only one writing
350 * operation may be active at any given moment, but it will
351 * be done in background). Thus it is intended to be used
352 * for writing a few big chunks rather than a lot of small pieces.
353 *
354 * The extravagant interface is mainly because this function
355 * does not use a separate struct to group together overlapped
356 * structure, buffer pointer and the handle.
357 *
358 * @param h pipe handle
359 * @param ov overlapped structure pointer
360 * @param buf buffer to read from. Will be copied internally
361 * @param size number of bytes to write
362 * @param old_buf pointer where a copy of previous buffer is stored,
363 * and where a copy of @buf will be stored.
364 *
365 * @return number of bytes written, -1 on error
366 */
367static int
368write_to_pipe (HANDLE h,
369 OVERLAPPED *ov,
370 unsigned char *buf, size_t size,
371 unsigned char **old_buf)
372{
373 DWORD written;
374 BOOL bresult;
375 DWORD err;
376
377 if (WAIT_OBJECT_0 != WaitForSingleObject (ov->hEvent, INFINITE))
378 return -1;
379
380 ResetEvent (ov->hEvent);
381
382 if (*old_buf != NULL)
383 free (*old_buf);
384
385 *old_buf = malloc (size);
386 if (*old_buf == NULL)
387 return -1;
388 memcpy (*old_buf, buf, size);
389 written = 0;
390 ov->Offset = 0;
391 ov->OffsetHigh = 0;
392 ov->Pointer = 0;
393 ov->Internal = 0;
394 ov->InternalHigh = 0;
395 bresult = WriteFile (h, *old_buf, size, &written, ov);
396
397 if (bresult == TRUE)
398 {
399 SetEvent (ov->hEvent);
400 free (*old_buf);
401 *old_buf = NULL;
402 return written;
403 }
404
405 err = GetLastError ();
406 if (err == ERROR_IO_PENDING)
407 return size;
408 SetEvent (ov->hEvent);
409 *old_buf = NULL;
410 SetLastError (err);
411 return -1;
412}
413
414
415#define plugin_write(plug, buf, size) write_to_pipe (plug->cpipe_in, &plug->ov_write, buf, size, &plug->ov_write_buffer)
416
417
418/** 361/**
419 * Communicates plugin data (library name, options) to the plugin 362 * Communicates plugin data (library name, options) to the plugin
420 * process. This is only necessary on W32, where this information 363 * process. This is only necessary on W32, where this information
@@ -425,27 +368,15 @@ write_to_pipe (HANDLE h,
425 * @return 0 on success, -1 on failure 368 * @return 0 on success, -1 on failure
426 */ 369 */
427static int 370static int
428write_plugin_data (struct EXTRACTOR_PluginList *plugin) 371write_plugin_data (struct EXTRACTOR_PluginList *plugin,
372 struct EXTRACTOR_Channel *channel)
429{ 373{
430 size_t libname_len, shortname_len, opts_len; 374 size_t libname_len, shortname_len, opts_len;
431 DWORD len; 375 DWORD len;
432 char *str; 376 char *str;
433 size_t total_len = 0; 377 size_t total_len = 0;
434 unsigned char *buf, *ptr; 378 unsigned char *buf, *ptr;
435 379 ssize_t write_result;
436 switch (plugin->flags)
437 {
438 case EXTRACTOR_OPTION_DEFAULT_POLICY:
439 break;
440 case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART:
441 break;
442 case EXTRACTOR_OPTION_IN_PROCESS:
443 return 0;
444 break;
445 case EXTRACTOR_OPTION_DISABLED:
446 return 0;
447 break;
448 }
449 380
450 libname_len = strlen (plugin->libname) + 1; 381 libname_len = strlen (plugin->libname) + 1;
451 total_len += sizeof (size_t) + libname_len; 382 total_len += sizeof (size_t) + libname_len;
@@ -481,24 +412,29 @@ write_plugin_data (struct EXTRACTOR_PluginList *plugin)
481 memcpy (ptr, plugin->plugin_options, opts_len); 412 memcpy (ptr, plugin->plugin_options, opts_len);
482 ptr += opts_len; 413 ptr += opts_len;
483 } 414 }
484 if (total_len != write_to_pipe (plugin->cpipe_in, &plugin->ov_write, buf, total_len, &plugin->ov_write_buffer)) 415 write_result = EXTRACTOR_IPC_channel_send_ (channel, buf, total_len);
485 {
486 free (buf);
487 return -1;
488 }
489 free (buf); 416 free (buf);
490 return 0; 417 return total_len == write_result;
491} 418}
492 419
493
494/** 420/**
495 * Start the process for the given plugin. 421 * Create a channel to communicate with a process wrapping
422 * the plugin of the given name. Starts the process as well.
423 *
424 * @param plugin the plugin
425 * @param shm memory to share with the process
426 * @return NULL on error, otherwise IPC channel
496 */ 427 */
497static void 428struct EXTRACTOR_Channel *
498start_process (struct EXTRACTOR_PluginList *plugin) 429EXTRACTOR_IPC_channel_create_ (struct EXTRACTOR_PluginList *plugin,
430 struct EXTRACTOR_SharedMemory *shm)
499{ 431{
432 struct EXTRACTOR_Channel *channel;
500 HANDLE p1[2]; 433 HANDLE p1[2];
501 HANDLE p2[2]; 434 HANDLE p2[2];
435 struct InitMessage *init;
436 size_t slen;
437
502 STARTUPINFO startup; 438 STARTUPINFO startup;
503 PROCESS_INFORMATION proc; 439 PROCESS_INFORMATION proc;
504 char cmd[MAX_PATH + 1]; 440 char cmd[MAX_PATH + 1];
@@ -507,50 +443,40 @@ start_process (struct EXTRACTOR_PluginList *plugin)
507 HANDLE p21_os_inh = INVALID_HANDLE_VALUE; 443 HANDLE p21_os_inh = INVALID_HANDLE_VALUE;
508 SECURITY_ATTRIBUTES sa; 444 SECURITY_ATTRIBUTES sa;
509 445
510 switch (plugin->flags) 446 if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel))))
511 { 447 {
512 case EXTRACTOR_OPTION_DEFAULT_POLICY: 448 LOG_STRERROR ("malloc");
513 if (plugin->hProcess != INVALID_HANDLE_VALUE && plugin->hProcess != 0) 449 return NULL;
514 return; 450 }
515 break; 451 channel->shm = shm;
516 case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: 452 channel->plugin = plugin;
517 if (plugin->hProcess != 0) 453 channel->size = 0;
518 return;
519 break;
520 case EXTRACTOR_OPTION_IN_PROCESS:
521 return;
522 break;
523 case EXTRACTOR_OPTION_DISABLED:
524 return;
525 break;
526 }
527 454
528 sa.nLength = sizeof (sa); 455 sa.nLength = sizeof (sa);
529 sa.lpSecurityDescriptor = NULL; 456 sa.lpSecurityDescriptor = NULL;
530 sa.bInheritHandle = FALSE; 457 sa.bInheritHandle = FALSE;
531 458
532 plugin->hProcess = NULL;
533
534 if (0 != create_selectable_pipe (&p1[0], &p1[1], &sa, 1024, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED)) 459 if (0 != create_selectable_pipe (&p1[0], &p1[1], &sa, 1024, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED))
535 { 460 {
536 plugin->flags = EXTRACTOR_OPTION_DISABLED; 461 LOG_STRERROR ("pipe");
537 return; 462 free (channel);
463 return NULL;
538 } 464 }
539 if (0 != create_selectable_pipe (&p2[0], &p2[1], &sa, 1024, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED)) 465 if (0 != create_selectable_pipe (&p2[0], &p2[1], &sa, 1024, FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED))
540 { 466 {
467 LOG_STRERROR ("pipe");
541 CloseHandle (p1[0]); 468 CloseHandle (p1[0]);
542 CloseHandle (p1[1]); 469 CloseHandle (p1[1]);
543 plugin->flags = EXTRACTOR_OPTION_DISABLED; 470 free (channel);
544 return; 471 return NULL;
545 } 472 }
546 473
547 memset (&startup, 0, sizeof (STARTUPINFO));
548
549 if (!DuplicateHandle (GetCurrentProcess (), p1[0], GetCurrentProcess (), 474 if (!DuplicateHandle (GetCurrentProcess (), p1[0], GetCurrentProcess (),
550 &p10_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS) 475 &p10_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS)
551 || !DuplicateHandle (GetCurrentProcess (), p2[1], GetCurrentProcess (), 476 || !DuplicateHandle (GetCurrentProcess (), p2[1], GetCurrentProcess (),
552 &p21_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS)) 477 &p21_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS))
553 { 478 {
479 LOG_STRERROR ("DuplicateHandle");
554 if (p10_os_inh != INVALID_HANDLE_VALUE) 480 if (p10_os_inh != INVALID_HANDLE_VALUE)
555 CloseHandle (p10_os_inh); 481 CloseHandle (p10_os_inh);
556 if (p21_os_inh != INVALID_HANDLE_VALUE) 482 if (p21_os_inh != INVALID_HANDLE_VALUE)
@@ -559,10 +485,14 @@ start_process (struct EXTRACTOR_PluginList *plugin)
559 CloseHandle (p1[1]); 485 CloseHandle (p1[1]);
560 CloseHandle (p2[0]); 486 CloseHandle (p2[0]);
561 CloseHandle (p2[1]); 487 CloseHandle (p2[1]);
562 plugin->flags = EXTRACTOR_OPTION_DISABLED; 488 CloseHandle (p1[0]);
563 return; 489 CloseHandle (p1[1]);
490 free (channel);
491 return NULL;
564 } 492 }
565 493
494 memset (&startup, 0, sizeof (STARTUPINFO));
495
566 /* TODO: write our own plugin-hosting executable? rundll32, for once, has smaller than usual stack size. 496 /* TODO: write our own plugin-hosting executable? rundll32, for once, has smaller than usual stack size.
567 * Also, users might freak out seeing over 9000 rundll32 processes (seeing over 9000 processes named 497 * Also, users might freak out seeing over 9000 rundll32 processes (seeing over 9000 processes named
568 * "libextractor_plugin_helper" is probably less confusing). 498 * "libextractor_plugin_helper" is probably less confusing).
@@ -571,323 +501,250 @@ start_process (struct EXTRACTOR_PluginList *plugin)
571 "rundll32.exe libextractor-3.dll,RundllEntryPoint@16 %lu %lu", 501 "rundll32.exe libextractor-3.dll,RundllEntryPoint@16 %lu %lu",
572 p10_os_inh, p21_os_inh); 502 p10_os_inh, p21_os_inh);
573 cmd[MAX_PATH] = '\0'; 503 cmd[MAX_PATH] = '\0';
574 if (CreateProcessA (NULL, cmd, NULL, NULL, TRUE, 0, NULL, NULL, 504 if (CreateProcessA (NULL, cmd, NULL, NULL, TRUE, CREATE_SUSPENDED, NULL, NULL,
575 &startup, &proc)) 505 &startup, &proc))
576 { 506 {
577 plugin->hProcess = proc.hProcess; 507 channel->hProcess = proc.hProcess;
508 ResumeThread (proc.hThread);
578 CloseHandle (proc.hThread); 509 CloseHandle (proc.hThread);
579 } 510 }
580 else 511 else
581 { 512 {
513 LOG_STRERROR ("CreateProcess");
582 CloseHandle (p1[0]); 514 CloseHandle (p1[0]);
583 CloseHandle (p1[1]); 515 CloseHandle (p1[1]);
584 CloseHandle (p2[0]); 516 CloseHandle (p2[0]);
585 CloseHandle (p2[1]); 517 CloseHandle (p2[1]);
586 plugin->flags = EXTRACTOR_OPTION_DISABLED; 518 free (channel);
587 return; 519 return NULL;
588 } 520 }
589 CloseHandle (p1[0]); 521 CloseHandle (p1[0]);
590 CloseHandle (p2[1]); 522 CloseHandle (p2[1]);
591 CloseHandle (p10_os_inh); 523 CloseHandle (p10_os_inh);
592 CloseHandle (p21_os_inh); 524 CloseHandle (p21_os_inh);
593 525
594 plugin->cpipe_in = p1[1]; 526 channel->cpipe_in = p1[1];
595 plugin->cpipe_out = p2[0]; 527 channel->cpipe_out = p2[0];
596
597 memset (&plugin->ov_read, 0, sizeof (OVERLAPPED));
598 memset (&plugin->ov_write, 0, sizeof (OVERLAPPED));
599 528
600 plugin->ov_write_buffer = NULL; 529 memset (&channel->ov_read, 0, sizeof (OVERLAPPED));
601 530 memset (&channel->ov_write, 0, sizeof (OVERLAPPED));
602 plugin->ov_write.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL);
603 plugin->ov_read.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL);
604}
605 531
532 channel->ov_write_buffer = NULL;
606 533
607/** 534 channel->ov_write.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL);
608 * Receive 'size' bytes from channel, store them in 'buf' 535 channel->ov_read.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL);
609 *
610 * @param plugin plugin context
611 * @param buf buffer to fill
612 * @param size number of bytes to read
613 * @return number of bytes read, 0 on EOS, < 0 on error
614 */
615static int
616plugin_read (struct EXTRACTOR_PluginList *plugin,
617 void *buf,
618 size_t size)
619{
620 char *rb = buf;
621 ssize_t read_result;
622 size_t read_count = 0;
623
624 while (read_count < size)
625 {
626 read_result = read (plugin->cpipe_out,
627 &rb[read_count], size - read_count);
628 if (read_result <= 0)
629 return read_result;
630 read_count += read_result;
631 }
632 return read_count;
633}
634
635
636/**
637 * Stop the child process of this plugin.
638 */
639static void
640stop_process (struct EXTRACTOR_PluginList *plugin)
641{
642 int status;
643 HANDLE process;
644 536
645#if DEBUG 537 if (!write_plugin_data (plugin, channel))
646 if (plugin->hProcess == INVALID_HANDLE_VALUE)
647 fprintf (stderr,
648 "Plugin `%s' choked on this input\n",
649 plugin->short_libname);
650#endif
651 if (plugin->hProcess == INVALID_HANDLE_VALUE ||
652 plugin->hProcess == NULL)
653 return;
654 TerminateProcess (plugin->hProcess, 0);
655 CloseHandle (plugin->hProcess);
656 plugin->hProcess = INVALID_HANDLE_VALUE;
657 CloseHandle (plugin->cpipe_out);
658 CloseHandle (plugin->cpipe_in);
659 plugin->cpipe_out = INVALID_HANDLE_VALUE;
660 plugin->cpipe_in = INVALID_HANDLE_VALUE;
661 CloseHandle (plugin->ov_read.hEvent);
662 CloseHandle (plugin->ov_write.hEvent);
663 if (plugin->ov_write_buffer != NULL)
664 { 538 {
665 free (plugin->ov_write_buffer); 539 LOG_STRERROR ("write_plugin_data");
666 plugin->ov_write_buffer = NULL; 540 EXTRACTOR_IPC_channel_destroy_ (channel);
541 return NULL;
667 } 542 }
668 543
669 if (plugin->flags != EXTRACTOR_OPTION_DEFAULT_POLICY) 544 slen = strlen (shm->shm_name) + 1;
670 plugin->flags = EXTRACTOR_OPTION_DISABLED; 545 if (NULL == (init = malloc (sizeof (struct InitMessage) + slen)))
671 546 {
672 plugin->seek_request = -1; 547 LOG_STRERROR ("malloc");
548 EXTRACTOR_IPC_channel_destroy_ (channel);
549 return NULL;
550 }
551 init->opcode = MESSAGE_INIT_STATE;
552 init->reserved = 0;
553 init->reserved2 = 0;
554 init->shm_name_length = slen;
555 init->shm_map_size = shm->shm_size;
556 memcpy (&init[1], shm->shm_name, slen);
557 if (sizeof (struct InitMessage) + slen !=
558 EXTRACTOR_IPC_channel_send_ (channel, init,
559 sizeof (struct InitMessage) + slen))
560 {
561 LOG ("Failed to send INIT_STATE message to plugin\n");
562 EXTRACTOR_IPC_channel_destroy_ (channel);
563 return NULL;
564 }
565 return channel;
673} 566}
674 567
675
676
677/** 568/**
678 * Setup a shared memory segment. 569 * Destroy communication channel with a plugin/process. Also
570 * destroys the process.
679 * 571 *
680 * @param ptr set to the location of the map segment 572 * @param channel channel to communicate with the plugin
681 * @param map where to store the map handle
682 * @param fn name of the mapping
683 * @param fn_size size available in fn
684 * @param size number of bytes to allocated for the mapping
685 * @return 0 on success
686 */ 573 */
687static int 574void
688make_shm_w32 (void **ptr, 575EXTRACTOR_IPC_channel_destroy_ (struct EXTRACTOR_Channel *channel)
689 HANDLE *map,
690 char *fn,
691 size_t fn_size, size_t size)
692{ 576{
693 const char *tpath = "Local\\"; 577 int status;
694 578
695 snprintf (fn, fn_size, 579 TerminateProcess (channel->hProcess, 0);
696 "%slibextractor-shm-%u-%u", 580 CloseHandle (channel->hProcess);
697 tpath, getpid(), 581 CloseHandle (channel->cpipe_out);
698 (unsigned int) RANDOM()); 582 CloseHandle (channel->cpipe_in);
699 *map = CreateFileMapping (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, size, fn); 583 CloseHandle (channel->ov_read.hEvent);
700 *ptr = MapViewOfFile (*map, FILE_MAP_WRITE, 0, 0, size); 584 CloseHandle (channel->ov_write.hEvent);
701 if (*ptr == NULL) 585 if (channel->ov_write_buffer != NULL)
702 { 586 {
703 CloseHandle (*map); 587 free (channel->ov_write_buffer);
704 return 1; 588 channel->ov_write_buffer = NULL;
705 } 589 }
706 return 0; 590 free (channel);
707} 591}
708 592
709/** 593/**
710 * Setup a file-backed shared memory segment. 594 * Send data via the given IPC channel (blocking).
711 * 595 *
712 * @param map where to store the map handle 596 * @param channel channel to communicate with the plugin
713 * @param file handle of the file to back the shm 597 * @param buf data to send
714 * @param fn name of the mapping 598 * @param size number of bytes in buf to send
715 * @param fn_size size available in fn 599 * @return -1 on error, number of bytes sent on success
716 * @param size number of bytes to allocated for the mapping 600 * (never does partial writes)
717 * @return 0 on success
718 */ 601 */
719static int 602ssize_t
720make_file_backed_shm_w32 (HANDLE *map, HANDLE file, char *fn, size_t fn_size) 603EXTRACTOR_IPC_channel_send_ (struct EXTRACTOR_Channel *channel,
721{ 604 const void *data,
722 const char *tpath = "Local\\"; 605 size_t size)
723 snprintf (fn, fn_size, "%slibextractor-shm-%u-%u", tpath, getpid(),
724 (unsigned int) RANDOM());
725 *map = CreateFileMapping (file, NULL, PAGE_READONLY, 0, 0, fn);
726 if (*map == NULL)
727 {
728 DWORD err = GetLastError ();
729 return 1;
730 }
731 return 0;
732}
733
734
735static void
736destroy_shm_w32 (void *ptr, HANDLE map)
737{
738 UnmapViewOfFile (ptr);
739 CloseHandle (map);
740}
741
742
743static void
744destroy_file_backed_shm_w32 (HANDLE map)
745{ 606{
746 CloseHandle (map); 607 DWORD written;
747} 608 DWORD err;
609 BOOL bresult;
610 const char *cdata = data;
748 611
612 if (WAIT_OBJECT_0 != WaitForSingleObject (channel->ov_write.hEvent, INFINITE))
613 return -1;
749 614
615 ResetEvent (channel->ov_write.hEvent);
750 616
751#define plugin_write(plug, buf, size) write_all (fileno (plug->cpipe_in), buf, size) 617 if (channel->old_buf != NULL)
618 free (channel->old_buf);
752 619
620 channel->old_buf = malloc (size);
621 if (channel->old_buf == NULL)
622 return -1;
753 623
754/** 624 memcpy (channel->old_buf, data, size);
755 * Receive 'size' bytes from plugin, store them in 'buf' 625 written = 0;
756 * 626 channel->ov_write.Offset = 0;
757 * @param plugin plugin context 627 channel->ov_write.OffsetHigh = 0;
758 * @param buf buffer to fill 628 channel->ov_write.Pointer = 0;
759 * @param size number of bytes to read 629 channel->ov_write.Internal = 0;
760 * @return number of bytes read, 0 on EOS, < 0 on error 630 channel->ov_write.InternalHigh = 0;
761 */ 631 bresult = WriteFile (channel->cpipe_in, channel->old_buf, size, &written, &channel->ov_write);
762static int
763plugin_read (struct EXTRACTOR_PluginList *plugin,
764 void *buf, size_t size)
765{
766 char *rb = buf;
767 DWORD bytes_read;
768 size_t read_count = 0;
769 632
770 while (read_count < size) 633 if (bresult == TRUE)
771 { 634 {
772 if (! ReadFile (plugin->cpipe_out, 635 SetEvent (channel->ov_write.hEvent);
773 &rb[read_count], size - read_count, 636 free (channel->old_buf);
774 &bytes_read, NULL)) 637 channel->old_buf = NULL;
775 return -1; 638 return written;
776 read_count += bytes_read;
777 } 639 }
778 return read_count; 640
641 err = GetLastError ();
642 if (err == ERROR_IO_PENDING)
643 return size;
644 SetEvent (channel->ov_write.hEvent);
645 free (channel->old_buf);
646 channel->old_buf = NULL;
647 SetLastError (err);
648 return -1;
779} 649}
780 650
781 651
782/** 652/**
653 * Receive data from any of the given IPC channels (blocking).
783 * Wait for one of the plugins to reply. 654 * Wait for one of the plugins to reply.
784 * Selects on plugin output pipes, runs receive_reply() 655 * Selects on plugin output pipes, runs 'receive_reply'
785 * on each activated pipe until it gets a seek request 656 * on each activated pipe until it gets a seek request
786 * or a done message. Called repeatedly by the user until all pipes are dry or 657 * or a done message. Called repeatedly by the user until all pipes are dry or
787 * broken. 658 * broken.
788 * This W32 version of wait_for_reply() can't select on more than 64 plugins
789 * at once (returns -1 if there are more than 64 plugins).
790 * 659 *
791 * @param plugins to select upon 660 * @param channels array of channels, channels that break may be set to NULL
792 * @param proc metadata callback 661 * @param num_channels length of the 'channels' array
793 * @param proc_cls callback cls 662 * @param proc function to call to process messages (may be called
794 * @return number of dry/broken pipes since last call, -1 on error or if no 663 * more than once)
795 * plugins reply in 10 seconds. 664 * @param proc_cls closure for 'proc'
665 * @return -1 on error, 1 on success
796 */ 666 */
797static int 667int
798wait_for_reply (struct EXTRACTOR_PluginList *plugins, 668EXTRACTOR_IPC_channel_recv_ (struct EXTRACTOR_Channel **channels,
799 EXTRACTOR_MetaDataProcessor proc, void *proc_cls) 669 unsigned int num_channels,
670 EXTRACTOR_ChannelMessageProcessor proc,
671 void *proc_cls)
800{ 672{
801 int result;
802 DWORD ms; 673 DWORD ms;
803 DWORD first_ready; 674 DWORD first_ready;
804 DWORD dwresult; 675 DWORD dwresult;
805 DWORD bytes_read; 676 DWORD bytes_read;
806 BOOL bresult; 677 BOOL bresult;
807 unsigned int i; 678 unsigned int i;
679 unsigned int c;
808 HANDLE events[MAXIMUM_WAIT_OBJECTS]; 680 HANDLE events[MAXIMUM_WAIT_OBJECTS];
809 struct EXTRACTOR_PluginList *ppos;
810 681
811 i = 0; 682 c = 0;
812 for (ppos = plugins; NULL != ppos; ppos = ppos->next) 683 for (i = 0; i < num_channels; i++)
813 { 684 {
814 if (i == MAXIMUM_WAIT_OBJECTS) 685 if (NULL == channels[i])
815 return -1;
816 if (ppos->seek_request == -1)
817 continue; 686 continue;
818 switch (ppos->flags) 687 if (MAXIMUM_WAIT_OBJECTS == c)
688 return -1;
689 if (WaitForSingleObject (channels[i]->ov_read.hEvent, 0) == WAIT_OBJECT_0)
819 { 690 {
820 case EXTRACTOR_OPTION_DEFAULT_POLICY: 691 ResetEvent (channels[i]->ov_read.hEvent);
821 case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: 692 bresult = ReadFile (channels[i]->cpipe_out, &i, 0, &bytes_read, &channels[i]->ov_read);
822 if (WaitForSingleObject (ppos->ov_read.hEvent, 0) == WAIT_OBJECT_0) 693 if (bresult == TRUE)
823 { 694 {
824 ResetEvent (ppos->ov_read.hEvent); 695 SetEvent (channels[i]->ov_read.hEvent);
825 bresult = ReadFile (ppos->cpipe_out, &i, 0, &bytes_read, &ppos->ov_read); 696 }
826 if (bresult == TRUE) 697 else
827 { 698 {
828 SetEvent (ppos->ov_read.hEvent); 699 DWORD err = GetLastError ();
829 } 700 if (err != ERROR_IO_PENDING)
830 else 701 SetEvent (channels[i]->ov_read.hEvent);
831 {
832 DWORD err = GetLastError ();
833 if (err != ERROR_IO_PENDING)
834 SetEvent (ppos->ov_read.hEvent);
835 }
836 } 702 }
837 events[i] = ppos->ov_read.hEvent;
838 i++;
839 break;
840 case EXTRACTOR_OPTION_IN_PROCESS:
841 break;
842 case EXTRACTOR_OPTION_DISABLED:
843 break;
844 } 703 }
704 events[c] = channels[i]->ov_read.hEvent;
705 c++;
845 } 706 }
846 707
847 ms = 10000; 708 ms = 10000;
848 first_ready = WaitForMultipleObjects (i, events, FALSE, ms); 709 first_ready = WaitForMultipleObjects (c, events, FALSE, ms);
849 if (first_ready == WAIT_TIMEOUT || first_ready == WAIT_FAILED) 710 if (first_ready == WAIT_TIMEOUT || first_ready == WAIT_FAILED)
711 {
850 /* an error or timeout -> something's wrong or all plugins hung up */ 712 /* an error or timeout -> something's wrong or all plugins hung up */
713 LOG_STRERROR ("WaitForMultipleObjects");
851 return -1; 714 return -1;
715 }
852 716
853 i = 0; 717 i = 0;
854 result = 0; 718 for (i = 0; i < num_channels; i++)
855 for (ppos = plugins; NULL != ppos; ppos = ppos->next)
856 { 719 {
857 int read_result; 720 if (NULL == channels[i])
858 switch (ppos->flags) 721 continue;
722 dwresult = WaitForSingleObject (channels[i]->ov_read.hEvent, 0);
723 if (dwresult == WAIT_OBJECT_0)
859 { 724 {
860 case EXTRACTOR_OPTION_DEFAULT_POLICY: 725 int ret;
861 case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: 726 bresult = ReadFile (channels[i]->cpipe_out,
862 if (ppos->seek_request == -1) 727 &channels[i]->data[channels[i]->size],
863 continue; 728 MAX_META_DATA - channels[i]->size, &bytes_read, NULL);
864 if (i < first_ready) 729 if (bresult)
730 ret = EXTRACTOR_IPC_process_reply_ (channels[i]->plugin,
731 channels[i]->data, channels[i]->size + bytes_read, proc, proc_cls);
732 if (!bresult || -1 == ret)
865 { 733 {
866 i += 1; 734 if (!bresult)
867 continue; 735 LOG_STRERROR ("ReadFile");
736 EXTRACTOR_IPC_channel_destroy_ (channels[i]);
737 channels[i] = NULL;
868 } 738 }
869 dwresult = WaitForSingleObject (ppos->ov_read.hEvent, 0); 739 else
870 read_result = 0;
871 if (dwresult == WAIT_OBJECT_0)
872 { 740 {
873 read_result = receive_reply (ppos, proc, proc_cls); 741 memmove (channels[i]->data, &channels[i]->data[ret],
874 result += 1; 742 channels[i]->size + bytes_read - ret);
743 channels[i]->size = channels[i]->size + bytes_read- ret;
875 } 744 }
876 if (dwresult == WAIT_FAILED || read_result < 0)
877 {
878 stop_process (ppos);
879 if (dwresult == WAIT_FAILED)
880 result += 1;
881 }
882 i++;
883 break;
884 case EXTRACTOR_OPTION_IN_PROCESS:
885 break;
886 case EXTRACTOR_OPTION_DISABLED:
887 break;
888 } 745 }
889 } 746 }
890 return result; 747 return 1;
891} 748}
892 749
893 750