diff options
Diffstat (limited to 'src/main/extractor_ipc_gnu.c')
-rw-r--r-- | src/main/extractor_ipc_gnu.c | 640 |
1 files changed, 274 insertions, 366 deletions
diff --git a/src/main/extractor_ipc_gnu.c b/src/main/extractor_ipc_gnu.c index b7f1824..7e7a086 100644 --- a/src/main/extractor_ipc_gnu.c +++ b/src/main/extractor_ipc_gnu.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of libextractor. | 2 | This file is part of libextractor. |
3 | (C) 2002, 2003, 2004, 2005, 2006, 2009, 2012 Vidyut Samanta and Christian Grothoff | 3 | (C) 2012 Vidyut Samanta and Christian Grothoff |
4 | 4 | ||
5 | libextractor is free software; you can redistribute it and/or modify | 5 | libextractor is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -17,24 +17,51 @@ | |||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | 17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | 20 | /** | |
21 | * @file main/extractor_ipc_gnu.c | ||
22 | * @brief IPC with plugin for GNU/POSIX systems | ||
23 | * @author Christian Grothoff | ||
24 | */ | ||
21 | #include "platform.h" | 25 | #include "platform.h" |
22 | #include "plibc.h" | 26 | #include "plibc.h" |
23 | #include "extractor.h" | 27 | #include "extractor.h" |
28 | #include "extractor_datasource.h" | ||
29 | #include "extractor_ipc.h" | ||
24 | #include <dirent.h> | 30 | #include <dirent.h> |
25 | #include <sys/types.h> | 31 | #include <sys/types.h> |
26 | #include <sys/wait.h> | 32 | #include <sys/wait.h> |
27 | #include <sys/shm.h> | 33 | #include <sys/shm.h> |
28 | #include <signal.h> | 34 | #include <signal.h> |
29 | 35 | ||
36 | /** | ||
37 | * Size of the channel buffer; determines largest IPC message that | ||
38 | * is going to be allowed. FIXME: we might want to grow this | ||
39 | * buffer dynamically instead... | ||
40 | */ | ||
41 | #define CHANNEL_BUFFER_SIZE (1024 * 256) | ||
30 | 42 | ||
31 | /** | 43 | /** |
32 | * Definition of an IPC communication channel with | 44 | * A shared memory resource (often shared with several |
33 | * some plugin. | 45 | * other processes). |
34 | */ | 46 | */ |
35 | struct EXTRACTOR_Channel | 47 | struct EXTRACTOR_SharedMemory |
36 | { | 48 | { |
37 | /** | 49 | /** |
50 | * Pointer to the mapped region of the shm (covers the whole shm) | ||
51 | */ | ||
52 | void *shm_ptr; | ||
53 | |||
54 | /** | ||
55 | * Page size. Mmap offset is a multiple of this number. | ||
56 | */ | ||
57 | long allocation_granularity; | ||
58 | |||
59 | /** | ||
60 | * Allocated size of the shm | ||
61 | */ | ||
62 | size_t shm_size; | ||
63 | |||
64 | /** | ||
38 | * POSIX id of the shm into which data is uncompressed | 65 | * POSIX id of the shm into which data is uncompressed |
39 | */ | 66 | */ |
40 | int shm; | 67 | int shm; |
@@ -44,447 +71,328 @@ struct EXTRACTOR_Channel | |||
44 | */ | 71 | */ |
45 | char shm_name[MAX_SHM_NAME + 1]; | 72 | char shm_name[MAX_SHM_NAME + 1]; |
46 | 73 | ||
74 | }; | ||
75 | |||
76 | |||
77 | /** | ||
78 | * Definition of an IPC communication channel with | ||
79 | * some plugin. | ||
80 | */ | ||
81 | struct EXTRACTOR_Channel | ||
82 | { | ||
83 | |||
47 | /** | 84 | /** |
48 | * Pointer to the mapped region of the shm (covers the whole shm) | 85 | * Buffer for reading data from the plugin. |
49 | */ | 86 | */ |
50 | void *shm_ptr; | 87 | char data[CHANNEL_BUFFER_SIZE]; |
51 | 88 | ||
52 | /** | 89 | /** |
53 | * Position within shm | 90 | * Memory segment shared with this process. |
54 | */ | 91 | */ |
55 | int64_t shm_pos; | 92 | struct EXTRACTOR_SharedMemory *shm; |
56 | 93 | ||
57 | /** | 94 | /** |
58 | * Allocated size of the shm | 95 | * Name of the plugin to use for this channel. |
59 | */ | 96 | */ |
60 | int64_t shm_size; | 97 | const char *short_libname; |
61 | 98 | ||
62 | /** | 99 | /** |
63 | * Number of bytes in shm (<= shm_size) | 100 | * Pipe used to communicate information to the plugin child process. |
64 | */ | 101 | * NULL if not initialized. |
65 | size_t shm_buf_size; | 102 | */ |
103 | int cpipe_in; | ||
66 | 104 | ||
105 | /** | ||
106 | * Number of valid bytes in the channel's buffer. | ||
107 | */ | ||
108 | size_t size; | ||
67 | 109 | ||
68 | }; | 110 | /** |
111 | * Pipe used to read information about extracted meta data from | ||
112 | * the plugin child process. -1 if not initialized. | ||
113 | */ | ||
114 | int cpipe_out; | ||
69 | 115 | ||
116 | /** | ||
117 | * Process ID of the child process for this plugin. 0 for none. | ||
118 | */ | ||
119 | pid_t cpid; | ||
70 | 120 | ||
71 | /** | 121 | }; |
72 | * Opens a shared memory object (for later mmapping). | ||
73 | * This is POSIX variant of the the plugin_open_* function. Shm is always memory-backed. | ||
74 | * Closes a shm is already opened, closes it before opening a new one. | ||
75 | * | ||
76 | * @param plugin plugin context | ||
77 | * @param shm_name name of the shm. | ||
78 | * @return shm id (-1 on error). That is, the result of shm_open() syscall. | ||
79 | */ | ||
80 | static int | ||
81 | plugin_open_shm (struct EXTRACTOR_PluginList *plugin, | ||
82 | const char *shm_name) | ||
83 | { | ||
84 | if (plugin->shm_id != -1) | ||
85 | close (plugin->shm_id); | ||
86 | plugin->shm_id = shm_open (shm_name, O_RDONLY, 0); | ||
87 | return plugin->shm_id; | ||
88 | } | ||
89 | 122 | ||
90 | 123 | ||
91 | /** | 124 | /** |
92 | * Opens a file (for later mmapping). | 125 | * Create a shared memory area. |
93 | * This is POSIX variant of the plugin_open_* function. | ||
94 | * Closes a file is already opened, closes it before opening a new one. | ||
95 | * | 126 | * |
96 | * @param plugin plugin context | 127 | * @param size size of the shared area |
97 | * @param shm_name name of the file to open. | 128 | * @return NULL on error |
98 | * @return file id (-1 on error). That is, the result of open() syscall. | 129 | */ |
99 | */ | 130 | struct EXTRACTOR_SharedMemory * |
100 | static int | 131 | EXTRACTOR_IPC_shared_memory_create_ (size_t size) |
101 | plugin_open_file (struct EXTRACTOR_PluginList *plugin, | ||
102 | const char *shm_name) | ||
103 | { | 132 | { |
104 | if (plugin->shm_id != -1) | 133 | struct EXTRACTOR_SharedMemory *shm; |
105 | close (plugin->shm_id); | 134 | const char *tpath; |
106 | plugin->shm_id = open (shm_name, O_RDONLY, 0); | 135 | |
107 | return plugin->shm_id; | 136 | if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory)))) |
137 | return NULL; | ||
138 | #if SOMEBSD | ||
139 | /* this works on FreeBSD, not sure about others... */ | ||
140 | tpath = getenv ("TMPDIR"); | ||
141 | if (tpath == NULL) | ||
142 | tpath = "/tmp/"; | ||
143 | #else | ||
144 | tpath = "/"; /* Linux */ | ||
145 | #endif | ||
146 | snprintf (shm->shm_name, | ||
147 | MAX_SHM_NAME, | ||
148 | "%slibextractor-shm-%u-%u", | ||
149 | tpath, getpid (), | ||
150 | (unsigned int) RANDOM()); | ||
151 | if (-1 == (shm->shm_id = shm_open (shm->shm_name, | ||
152 | O_RDWR | O_CREAT, S_IRUSR | S_IWUSR))) | ||
153 | { | ||
154 | free (shm); | ||
155 | return NULL; | ||
156 | } | ||
157 | if ( (0 != ftruncate (shm->shm_id, size)) || | ||
158 | (NULL == (shm->shm_ptr = mmap (NULL, size, | ||
159 | PROT_WRITE, MAP_SHARED, | ||
160 | shm->shm_id, 0))) || | ||
161 | (((void*) -1) == shm->shm_ptr) ) | ||
162 | { | ||
163 | (void) close (shm->shm_id); | ||
164 | (void) shm_unlink (shm->shm_name); | ||
165 | free (shm); | ||
166 | return NULL; | ||
167 | } | ||
168 | shm->shm_size = size; | ||
169 | return shm; | ||
108 | } | 170 | } |
109 | 171 | ||
110 | 172 | ||
111 | /** | 173 | /** |
112 | * Initializes an extracting session for a plugin. | 174 | * Destroy shared memory area. |
113 | * opens the file/shm (only in OPMODE_FILE) | ||
114 | * sets shm_ptr to NULL (unmaps it, if it was mapped) | ||
115 | * sets position to 0 | ||
116 | * initializes file size to 'fsize' (may be -1) | ||
117 | * sets seek request to 0 | ||
118 | * | 175 | * |
119 | * @param plugin plugin context | 176 | * @param shm memory area to destroy |
120 | * @param operation_mode the mode of operation (OPMODE_*) | 177 | * @return NULL on error |
121 | * @param fsize size of the source file (may be -1) | 178 | */ |
122 | * @param shm_name name of the shm or file to open | 179 | void |
123 | * @return 0 on success, non-0 on error. | 180 | EXTRACTOR_IPC_shared_memory_destroy_ (struct EXTRACTOR_SharedMemory *shm) |
124 | */ | ||
125 | static int | ||
126 | init_state_method (struct EXTRACTOR_PluginList *plugin, | ||
127 | uint8_t operation_mode, | ||
128 | int64_t fsize, | ||
129 | const char *shm_name) | ||
130 | { | 181 | { |
131 | plugin->seek_request = 0; | 182 | munmap (shm->shm_ptr, shm->map_size); |
132 | if (plugin->shm_ptr != NULL) | 183 | (void) close (plugin->shm_id); |
133 | munmap (plugin->shm_ptr, plugin->map_size); | 184 | (void) shm_unlink (shm->shm_name); |
134 | plugin->shm_ptr = NULL; | 185 | free (shm); |
135 | if (operation_mode == OPMODE_FILE) | ||
136 | { | ||
137 | if (-1 == plugin_open_file (plugin, shm_name)) | ||
138 | return 1; | ||
139 | } | ||
140 | else if (-1 == plugin_open_shm (plugin, shm_name)) | ||
141 | return 1; | ||
142 | plugin->fsize = fsize; | ||
143 | plugin->shm_pos = 0; | ||
144 | plugin->fpos = 0; | ||
145 | return 0; | ||
146 | } | 186 | } |
147 | 187 | ||
148 | 188 | ||
149 | /** | 189 | /** |
150 | * Deinitializes an extracting session for a plugin. | 190 | * Initialize shared memory area from data source. |
151 | * unmaps shm_ptr (if was mapped) | ||
152 | * closes file/shm (if it was opened) | ||
153 | * sets map size and shm_ptr to NULL. | ||
154 | * | 191 | * |
155 | * @param plugin plugin context | 192 | * @param shm memory area to initialize |
156 | */ | 193 | * @param ds data source to use for initialization |
157 | static void | 194 | * @param off offset to use in data source |
158 | discard_state_method (struct EXTRACTOR_PluginList *plugin) | 195 | * @param size number of bytes to copy |
196 | * @return -1 on error, otherwise number of bytes copied | ||
197 | */ | ||
198 | ssize_t | ||
199 | EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm, | ||
200 | struct EXTRACTOR_Datasource *ds, | ||
201 | uint64_t off, | ||
202 | size_t size) | ||
159 | { | 203 | { |
160 | if (plugin->shm_ptr != NULL && plugin->map_size > 0) | 204 | if (-1 == |
161 | munmap (plugin->shm_ptr, plugin->map_size); | 205 | EXTRACTOR_datasource_seek_ (ds, off, SEEK_SET)) |
162 | if (plugin->shm_id != -1) | 206 | return -1; |
163 | close (plugin->shm_id); | 207 | if (size > shm->map_size) |
164 | plugin->shm_id = -1; | 208 | size = shm->map_size; |
165 | plugin->map_size = 0; | 209 | return EXTRACTOR_datasource_read_ (ds, |
166 | plugin->shm_ptr = NULL; | 210 | shm->shm_ptr, |
211 | size); | ||
167 | } | 212 | } |
168 | 213 | ||
169 | 214 | ||
170 | |||
171 | /** | 215 | /** |
172 | * Start the process for the given plugin. | 216 | * Create a channel to communicate with a process wrapping |
217 | * the plugin of the given name. Starts the process as well. | ||
218 | * | ||
219 | * @param short_libname name of the plugin | ||
220 | * @param shm memory to share with the process | ||
221 | * @return NULL on error, otherwise IPC channel | ||
173 | */ | 222 | */ |
174 | static void | 223 | struct EXTRACTOR_Channel * |
175 | start_process (struct EXTRACTOR_PluginList *plugin) | 224 | EXTRACTOR_IPC_channel_create_ (const char *short_libname, |
225 | struct EXTRACTOR_SharedMemory *shm) | ||
176 | { | 226 | { |
227 | struct EXTRACTOR_Channel *channel; | ||
177 | int p1[2]; | 228 | int p1[2]; |
178 | int p2[2]; | 229 | int p2[2]; |
179 | pid_t pid; | 230 | pid_t pid; |
180 | int status; | 231 | int status; |
181 | 232 | ||
182 | switch (plugin->flags) | 233 | if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel)))) |
183 | { | 234 | return NULL; |
184 | case EXTRACTOR_OPTION_DEFAULT_POLICY: | 235 | channel->shm = shm; |
185 | if (-1 != plugin->cpid && 0 != plugin->cpid) | 236 | channel->short_libname = short_libname; |
186 | return; | ||
187 | break; | ||
188 | case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: | ||
189 | if (0 != plugin->cpid) | ||
190 | return; | ||
191 | break; | ||
192 | case EXTRACTOR_OPTION_IN_PROCESS: | ||
193 | return; | ||
194 | break; | ||
195 | case EXTRACTOR_OPTION_DISABLED: | ||
196 | return; | ||
197 | break; | ||
198 | } | ||
199 | |||
200 | plugin->cpid = -1; | ||
201 | if (0 != pipe (p1)) | 237 | if (0 != pipe (p1)) |
202 | { | 238 | { |
203 | plugin->flags = EXTRACTOR_OPTION_DISABLED; | 239 | free (channel); |
204 | return; | 240 | return NULL; |
205 | } | 241 | } |
206 | if (0 != pipe (p2)) | 242 | if (0 != pipe (p2)) |
207 | { | 243 | { |
208 | close (p1[0]); | 244 | (void) close (p1[0]); |
209 | close (p1[1]); | 245 | (void) close (p1[1]); |
210 | plugin->flags = EXTRACTOR_OPTION_DISABLED; | 246 | free (channel); |
211 | return; | 247 | return; |
212 | } | 248 | } |
213 | pid = fork (); | 249 | pid = fork (); |
214 | plugin->cpid = pid; | ||
215 | if (pid == -1) | 250 | if (pid == -1) |
216 | { | 251 | { |
217 | close (p1[0]); | 252 | (void) close (p1[0]); |
218 | close (p1[1]); | 253 | (void) close (p1[1]); |
219 | close (p2[0]); | 254 | (void) close (p2[0]); |
220 | close (p2[1]); | 255 | (void) close (p2[1]); |
221 | plugin->flags = EXTRACTOR_OPTION_DISABLED; | 256 | free (channel); |
222 | return; | 257 | return NULL; |
223 | } | 258 | } |
224 | if (pid == 0) | 259 | if (0 == pid) |
225 | { | 260 | { |
226 | close (p1[1]); | 261 | (void) close (p1[1]); |
227 | close (p2[0]); | 262 | (void) close (p2[0]); |
228 | plugin_main (plugin, p1[0], p2[1]); | 263 | EXTRACTOR_plugin_main_ (short_libname, p1[0], p2[1]); |
229 | _exit (0); | 264 | _exit (0); |
230 | } | 265 | } |
231 | close (p1[0]); | 266 | (void) close (p1[0]); |
232 | close (p2[1]); | 267 | (void) close (p2[1]); |
233 | plugin->cpipe_in = fdopen (p1[1], "w"); | 268 | channel->cpipe_in = p1[1]; |
234 | if (plugin->cpipe_in == NULL) | 269 | channel->cpipe_out = p2[0]; |
235 | { | 270 | channel->cpid = pid; |
236 | perror ("fdopen"); | 271 | return channel; |
237 | (void) kill (plugin->cpid, SIGKILL); | ||
238 | waitpid (plugin->cpid, &status, 0); | ||
239 | close (p1[1]); | ||
240 | close (p2[0]); | ||
241 | plugin->cpid = -1; | ||
242 | plugin->flags = EXTRACTOR_OPTION_DISABLED; | ||
243 | return; | ||
244 | } | ||
245 | plugin->cpipe_out = p2[0]; | ||
246 | } | ||
247 | |||
248 | |||
249 | /** | ||
250 | * Stop the child process of this plugin. | ||
251 | */ | ||
252 | static void | ||
253 | stop_process (struct EXTRACTOR_PluginList *plugin) | ||
254 | { | ||
255 | int status; | ||
256 | |||
257 | #if DEBUG | ||
258 | if (plugin->cpid == -1) | ||
259 | fprintf (stderr, | ||
260 | "Plugin `%s' choked on this input\n", | ||
261 | plugin->short_libname); | ||
262 | #endif | ||
263 | if ( (plugin->cpid == -1) || | ||
264 | (plugin->cpid == 0) ) | ||
265 | return; | ||
266 | kill (plugin->cpid, SIGKILL); | ||
267 | waitpid (plugin->cpid, &status, 0); | ||
268 | plugin->cpid = -1; | ||
269 | close (plugin->cpipe_out); | ||
270 | fclose (plugin->cpipe_in); | ||
271 | plugin->cpipe_out = -1; | ||
272 | plugin->cpipe_in = NULL; | ||
273 | |||
274 | if (plugin->flags != EXTRACTOR_OPTION_DEFAULT_POLICY) | ||
275 | plugin->flags = EXTRACTOR_OPTION_DISABLED; | ||
276 | |||
277 | plugin->seek_request = -1; | ||
278 | } | ||
279 | |||
280 | |||
281 | static int | ||
282 | write_plugin_data (const struct EXTRACTOR_PluginList *plugin) | ||
283 | { | ||
284 | /* This function is only necessary on W32. On POSIX | ||
285 | * systems plugin inherits its own data from the parent */ | ||
286 | return 0; | ||
287 | } | ||
288 | |||
289 | |||
290 | /** | ||
291 | * Initializes an extracting session for a plugin. | ||
292 | * opens the file/shm (only in OPMODE_FILE) | ||
293 | * sets shm_ptr to NULL (unmaps it, if it was mapped) | ||
294 | * sets position to 0 | ||
295 | * initializes file size to 'fsize' (may be -1) | ||
296 | * sets seek request to 0 | ||
297 | * | ||
298 | * @param plugin plugin context | ||
299 | * @param operation_mode the mode of operation (OPMODE_*) | ||
300 | * @param fsize size of the source file (may be -1) | ||
301 | * @param shm_name name of the shm or file to open | ||
302 | * @return 0 on success, non-0 on error. | ||
303 | */ | ||
304 | static int | ||
305 | init_state_method (struct EXTRACTOR_PluginList *plugin, | ||
306 | uint8_t operation_mode, | ||
307 | int64_t fsize, | ||
308 | const char *shm_name) | ||
309 | { | ||
310 | plugin->seek_request = 0; | ||
311 | if (plugin->shm_ptr != NULL) | ||
312 | munmap (plugin->shm_ptr, plugin->map_size); | ||
313 | plugin->shm_ptr = NULL; | ||
314 | if (operation_mode == OPMODE_FILE) | ||
315 | { | ||
316 | if (-1 == plugin_open_file (plugin, shm_name)) | ||
317 | return 1; | ||
318 | } | ||
319 | else if (-1 == plugin_open_shm (plugin, shm_name)) | ||
320 | return 1; | ||
321 | plugin->fsize = fsize; | ||
322 | plugin->shm_pos = 0; | ||
323 | plugin->fpos = 0; | ||
324 | return 0; | ||
325 | } | 272 | } |
326 | 273 | ||
327 | 274 | ||
328 | /** | 275 | /** |
329 | * Setup a shared memory segment. | 276 | * Destroy communication channel with a plugin/process. Also |
277 | * destroys the process. | ||
330 | * | 278 | * |
331 | * @param ptr set to the location of the shm segment | 279 | * @param channel channel to communicate with the plugin |
332 | * @param shmid where to store the shm ID | ||
333 | * @param fn name of the shared segment | ||
334 | * @param fn_size size available in fn | ||
335 | * @param size number of bytes to allocated for the segment | ||
336 | * @return 0 on success | ||
337 | */ | 280 | */ |
338 | static int | 281 | void |
339 | make_shm_posix (void **ptr, | 282 | EXTRACTOR_IPC_channel_destroy_ (struct EXTRACTOR_Channel *channel) |
340 | int *shmid, | ||
341 | char *fn, | ||
342 | size_t fn_size, size_t size) | ||
343 | { | 283 | { |
344 | const char *tpath; | 284 | int status; |
345 | #if SOMEBSD | ||
346 | /* this works on FreeBSD, not sure about others... */ | ||
347 | tpath = getenv ("TMPDIR"); | ||
348 | if (tpath == NULL) | ||
349 | tpath = "/tmp/"; | ||
350 | #else | ||
351 | tpath = "/"; /* Linux */ | ||
352 | #endif | ||
353 | snprintf (fn, fn_size, "%slibextractor-shm-%u-%u", tpath, getpid(), | ||
354 | (unsigned int) RANDOM()); | ||
355 | *shmid = shm_open (fn, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); | ||
356 | *ptr = NULL; | ||
357 | if (-1 == *shmid) | ||
358 | return 1; | ||
359 | if ((0 != ftruncate (*shmid, size)) || | ||
360 | (NULL == (*ptr = mmap (NULL, size, PROT_WRITE, MAP_SHARED, *shmid, 0))) || | ||
361 | (*ptr == (void*) -1) ) | ||
362 | { | ||
363 | close (*shmid); | ||
364 | *shmid = -1; | ||
365 | shm_unlink (fn); | ||
366 | return 1; | ||
367 | } | ||
368 | return 0; | ||
369 | } | ||
370 | |||
371 | 285 | ||
372 | static void | 286 | (void) kill (channel->cpid, SIGKILL); |
373 | destroy_shm_posix (void *ptr, int shm_id, size_t size, char *shm_name) | 287 | (void) waitpid (channel->cpid, &status, 0); |
374 | { | 288 | (void) close (channel->cpipe_out); |
375 | if (NULL != ptr) | 289 | (void) close (channel->cpipe_in); |
376 | munmap (ptr, size); | 290 | free (channel); |
377 | if (shm_id != -1) | ||
378 | close (shm_id); | ||
379 | shm_unlink (shm_name); | ||
380 | } | 291 | } |
381 | 292 | ||
382 | 293 | ||
383 | /** | 294 | /** |
384 | * Receive 'size' bytes from plugin, store them in 'buf' | 295 | * Send data via the given IPC channel (blocking). |
385 | * | 296 | * |
386 | * @param plugin plugin context | 297 | * @param channel channel to communicate with the plugin |
387 | * @param buf buffer to fill | 298 | * @param buf data to send |
388 | * @param size number of bytes to read | 299 | * @param size number of bytes in buf to send |
389 | * @return number of bytes read, 0 on EOS, < 0 on error | 300 | * @return -1 on error, number of bytes sent on success |
301 | * (never does partial writes) | ||
390 | */ | 302 | */ |
391 | static int | 303 | ssize_t |
392 | plugin_read (struct EXTRACTOR_PluginList *plugin, | 304 | EXTRACTOR_IPC_channel_send_ (struct EXTRACTOR_Channel *channel, |
393 | void *buf, | 305 | const void *data, |
394 | size_t size) | 306 | size_t size) |
395 | { | 307 | { |
396 | char *rb = buf; | 308 | const char *cdata = data; |
397 | ssize_t read_result; | 309 | size_t off = 0; |
398 | size_t read_count = 0; | 310 | ssize_t ret; |
399 | 311 | ||
400 | while (read_count < size) | 312 | while (off < size) |
401 | { | 313 | { |
402 | read_result = read (plugin->cpipe_out, | 314 | ret = write (channel->cpipe_in, &cdata[off], size - off); |
403 | &rb[read_count], size - read_count); | 315 | if (ret <= 0) |
404 | if (read_result <= 0) | 316 | return -1; |
405 | return read_result; | 317 | off += ret; |
406 | read_count += read_result; | 318 | } |
407 | } | 319 | return size; |
408 | return read_count; | ||
409 | } | 320 | } |
410 | 321 | ||
411 | 322 | ||
412 | /** | 323 | /** |
324 | * Receive data from any of the given IPC channels (blocking). | ||
413 | * Wait for one of the plugins to reply. | 325 | * Wait for one of the plugins to reply. |
414 | * Selects on plugin output pipes, runs receive_reply() | 326 | * Selects on plugin output pipes, runs 'receive_reply' |
415 | * on each activated pipe until it gets a seek request | 327 | * on each activated pipe until it gets a seek request |
416 | * or a done message. Called repeatedly by the user until all pipes are dry or | 328 | * or a done message. Called repeatedly by the user until all pipes are dry or |
417 | * broken. | 329 | * broken. |
418 | * | 330 | * |
419 | * @param plugins to select upon | 331 | * @param channels array of channels, channels that break may be set to NULL |
420 | * @param proc metadata callback | 332 | * @param num_channels length of the 'channels' array |
421 | * @param proc_cls callback cls | 333 | * @param proc function to call to process messages (may be called |
422 | * @return number of dry/broken pipes since last call, -1 on error or if no | 334 | * more than once) |
423 | * plugins reply in 10 seconds. | 335 | * @param proc_cls closure for 'proc' |
336 | * @return -1 on error, 1 on success | ||
424 | */ | 337 | */ |
425 | static int | 338 | int |
426 | wait_for_reply (struct EXTRACTOR_PluginList *plugins, | 339 | EXTRACTOR_IPC_channel_recv_ (struct EXTRACTOR_Channel **channels, |
427 | EXTRACTOR_MetaDataProcessor proc, void *proc_cls) | 340 | unsigned int num_channels, |
341 | EXTRACTOR_ChannelMessageProcessor proc, | ||
342 | void *proc_cls) | ||
428 | { | 343 | { |
429 | int ready; | ||
430 | int result; | ||
431 | struct timeval tv; | 344 | struct timeval tv; |
432 | fd_set to_check; | 345 | fd_set to_check; |
433 | int highest = 0; | 346 | int max; |
434 | int read_result; | 347 | unsigned int i; |
435 | struct EXTRACTOR_PluginList *ppos; | 348 | struct EXTRACTOR_Channel *channel; |
349 | ssize_t ret; | ||
350 | ssize_t iret; | ||
436 | 351 | ||
437 | FD_ZERO (&to_check); | 352 | FD_ZERO (&to_check); |
438 | for (ppos = plugins; NULL != ppos; ppos = ppos->next) | 353 | max = -1; |
354 | for (i=0;i<num_channels;i++) | ||
439 | { | 355 | { |
440 | switch (ppos->flags) | 356 | channel = channels[i]; |
441 | { | 357 | if (NULL == channel) |
442 | case EXTRACTOR_OPTION_DEFAULT_POLICY: | 358 | continue; |
443 | case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: | 359 | FD_SET (channel->cpipe_out, &to_check); |
444 | if (ppos->seek_request == -1) | 360 | if (max < channel->cpipe_out) |
445 | continue; | 361 | max = channel->cpipe_out; |
446 | FD_SET (ppos->cpipe_out, &to_check); | ||
447 | if (highest < ppos->cpipe_out) | ||
448 | highest = ppos->cpipe_out; | ||
449 | break; | ||
450 | case EXTRACTOR_OPTION_IN_PROCESS: | ||
451 | break; | ||
452 | case EXTRACTOR_OPTION_DISABLED: | ||
453 | break; | ||
454 | } | ||
455 | } | 362 | } |
456 | |||
457 | tv.tv_sec = 10; | 363 | tv.tv_sec = 10; |
458 | tv.tv_usec = 0; | 364 | tv.tv_usec = 0; |
459 | ready = select (highest + 1, &to_check, NULL, NULL, &tv); | 365 | if (-1 == select (max + 1, &to_check, NULL, NULL, &tv)) |
460 | if (ready <= 0) | ||
461 | /* an error or timeout -> something's wrong or all plugins hung up */ | ||
462 | return -1; | ||
463 | |||
464 | result = 0; | ||
465 | for (ppos = plugins; NULL != ppos; ppos = ppos->next) | ||
466 | { | 366 | { |
467 | switch (ppos->flags) | 367 | /* an error or timeout -> something's wrong or all plugins hung up */ |
368 | return -1; | ||
369 | } | ||
370 | for (i=0;i<num_channels;i++) | ||
371 | { | ||
372 | channel = channels[i]; | ||
373 | if (NULL == channel) | ||
374 | continue; | ||
375 | if (! FD_ISSET (channel->cpipe_out, &to_check)) | ||
376 | continue; | ||
377 | if ( (-1 == (iret = read (channel->cpipe_out, | ||
378 | &channel->data[channel->size], | ||
379 | CHANNEL_BUFFER_SIZE - channel->size)) ) || | ||
380 | (ret = EXTRACTOR_IPC_process_reply_ (channel->data, | ||
381 | channel->size + iret, | ||
382 | proc, proc_cls)) ) | ||
383 | { | ||
384 | EXTRACTOR_IPC_channel_destroy (channel); | ||
385 | channels[i] = NULL; | ||
386 | } | ||
387 | else | ||
468 | { | 388 | { |
469 | case EXTRACTOR_OPTION_DEFAULT_POLICY: | 389 | memmove (channel->data, |
470 | case EXTRACTOR_OPTION_OUT_OF_PROCESS_NO_RESTART: | 390 | &channel->data[ret], |
471 | if (ppos->seek_request == -1) | 391 | channel->size + iret - ret); |
472 | continue; | 392 | channel->size = channel->size + iret - ret; |
473 | if (FD_ISSET (ppos->cpipe_out, &to_check)) | ||
474 | { | ||
475 | read_result = receive_reply (ppos, proc, proc_cls); | ||
476 | if (read_result < 0) | ||
477 | { | ||
478 | stop_process (ppos); | ||
479 | } | ||
480 | result += 1; | ||
481 | } | ||
482 | break; | ||
483 | case EXTRACTOR_OPTION_IN_PROCESS: | ||
484 | break; | ||
485 | case EXTRACTOR_OPTION_DISABLED: | ||
486 | break; | ||
487 | } | 393 | } |
488 | } | 394 | } |
489 | return result; | 395 | return 1; |
490 | } | 396 | } |
397 | |||
398 | /* end of extractor_ipc_gnu.c */ | ||