extractor_ipc_gnu.c (14893B)
1 /* 2 This file is part of libextractor. 3 Copyright (C) 2012 Vidyut Samanta and Christian Grothoff 4 5 libextractor is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published 7 by the Free Software Foundation; either version 3, or (at your 8 option) any later version. 9 10 libextractor is distributed in the hope that it will be useful, but 11 WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with libextractor; see the file COPYING. If not, write to the 17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 18 Boston, MA 02110-1301, USA. 19 */ 20 /** 21 * @file main/extractor_ipc_gnu.c 22 * @brief IPC with plugin for GNU/POSIX systems 23 * @author Christian Grothoff 24 */ 25 #include "platform.h" 26 #include "extractor.h" 27 #include "extractor_datasource.h" 28 #include "extractor_logging.h" 29 #include "extractor_plugin_main.h" 30 #include "extractor_plugins.h" 31 #include "extractor_ipc.h" 32 #include <dirent.h> 33 #include <sys/types.h> 34 #include <sys/wait.h> 35 #include <sys/shm.h> 36 #include <signal.h> 37 #if HAVE_SYS_APPARMOR_H 38 #include <sys/apparmor.h> 39 #endif 40 41 /** 42 * A shared memory resource (often shared with several 43 * other processes). 44 */ 45 struct EXTRACTOR_SharedMemory 46 { 47 /** 48 * Pointer to the mapped region of the shm (covers the whole shm) 49 */ 50 void *shm_ptr; 51 52 /** 53 * Allocated size of the shm 54 */ 55 size_t shm_size; 56 57 /** 58 * POSIX id of the shm into which data is uncompressed 59 */ 60 int shm_id; 61 62 /** 63 * Name of the shm 64 */ 65 char shm_name[MAX_SHM_NAME + 1]; 66 67 /** 68 * Reference counter describing how many references share this SHM. 69 */ 70 unsigned int rc; 71 72 }; 73 74 75 /** 76 * Definition of an IPC communication channel with 77 * some plugin. 78 */ 79 struct EXTRACTOR_Channel 80 { 81 82 /** 83 * Buffer for reading data from the plugin. 84 */ 85 char *mdata; 86 87 /** 88 * Size of the @e mdata buffer. 89 */ 90 size_t mdata_size; 91 92 /** 93 * Memory segment shared with this process. 94 */ 95 struct EXTRACTOR_SharedMemory *shm; 96 97 /** 98 * The plugin this channel is to communicate with. 99 */ 100 struct EXTRACTOR_PluginList *plugin; 101 102 /** 103 * Pipe used to communicate information to the plugin child process. 104 * NULL if not initialized. 105 */ 106 int cpipe_in; 107 108 /** 109 * Number of valid bytes in the channel's buffer. 110 */ 111 size_t size; 112 113 /** 114 * Pipe used to read information about extracted meta data from 115 * the plugin child process. -1 if not initialized. 116 */ 117 int cpipe_out; 118 119 /** 120 * Process ID of the child process for this plugin. 0 for none. 121 */ 122 pid_t cpid; 123 124 }; 125 126 127 /** 128 * Create a shared memory area. 129 * 130 * @param size size of the shared area 131 * @return NULL on error 132 */ 133 struct EXTRACTOR_SharedMemory * 134 EXTRACTOR_IPC_shared_memory_create_ (size_t size) 135 { 136 struct EXTRACTOR_SharedMemory *shm; 137 const char *tpath; 138 139 if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory)))) 140 { 141 LOG_STRERROR ("malloc"); 142 return NULL; 143 } 144 #if SOMEBSD 145 /* this works on FreeBSD, not sure about others... */ 146 tpath = getenv ("TMPDIR"); 147 if (NULL == tpath) 148 tpath = "/tmp/"; 149 #else 150 tpath = "/"; /* Linux */ 151 #endif 152 snprintf (shm->shm_name, 153 MAX_SHM_NAME, 154 "%sLE-%u-%u", 155 tpath, 156 getpid (), 157 (unsigned int) random ()); 158 if (-1 == (shm->shm_id = shm_open (shm->shm_name, 159 O_RDWR | O_CREAT, 160 S_IRUSR | S_IWUSR))) 161 { 162 LOG_STRERROR_FILE ("shm_open", 163 shm->shm_name); 164 free (shm); 165 return NULL; 166 } 167 if ( (0 != ftruncate (shm->shm_id, size)) || 168 (NULL == (shm->shm_ptr = mmap (NULL, 169 size, 170 PROT_WRITE, 171 MAP_SHARED, 172 shm->shm_id, 173 0))) || 174 (((void*) -1) == shm->shm_ptr) ) 175 { 176 LOG_STRERROR ("ftruncate/mmap"); 177 (void) close (shm->shm_id); 178 (void) shm_unlink (shm->shm_name); 179 free (shm); 180 return NULL; 181 } 182 shm->shm_size = size; 183 shm->rc = 0; 184 return shm; 185 } 186 187 188 /** 189 * Change the reference counter for this shm instance. 190 * 191 * @param shm instance to update 192 * @param delta value to change RC by 193 * @return new RC 194 */ 195 unsigned int 196 EXTRACTOR_IPC_shared_memory_change_rc_ (struct EXTRACTOR_SharedMemory *shm, 197 int delta) 198 { 199 shm->rc += delta; 200 return shm->rc; 201 } 202 203 204 /** 205 * Destroy shared memory area. 206 * 207 * @param shm memory area to destroy 208 * @return NULL on error 209 */ 210 void 211 EXTRACTOR_IPC_shared_memory_destroy_ (struct EXTRACTOR_SharedMemory *shm) 212 { 213 munmap (shm->shm_ptr, 214 shm->shm_size); 215 (void) close (shm->shm_id); 216 (void) shm_unlink (shm->shm_name); 217 free (shm); 218 } 219 220 221 /** 222 * Initialize shared memory area from data source. 223 * 224 * @param shm memory area to initialize 225 * @param ds data source to use for initialization 226 * @param off offset to use in data source 227 * @param size number of bytes to copy 228 * @return -1 on error, otherwise number of bytes copied 229 */ 230 ssize_t 231 EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm, 232 struct EXTRACTOR_Datasource *ds, 233 uint64_t off, 234 size_t size) 235 { 236 if (-1 == 237 EXTRACTOR_datasource_seek_ (ds, 238 off, 239 SEEK_SET)) 240 { 241 LOG ("Failed to set IPC memory due to seek error\n"); 242 return -1; 243 } 244 if (size > shm->shm_size) 245 size = shm->shm_size; 246 return EXTRACTOR_datasource_read_ (ds, 247 shm->shm_ptr, 248 size); 249 } 250 251 252 /** 253 * Query datasource for current position 254 * 255 * @param ds data source to query 256 * @return current position in the datasource or UINT_MAX on error 257 */ 258 uint64_t 259 EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds) 260 { 261 int64_t pos = EXTRACTOR_datasource_seek_ (ds, 262 0, 263 SEEK_CUR); 264 265 if (-1 == pos) 266 return UINT_MAX; 267 return pos; 268 } 269 270 271 /** 272 * Create a channel to communicate with a process wrapping 273 * the plugin of the given name. Starts the process as well. 274 * 275 * @param plugin the plugin 276 * @param shm memory to share with the process 277 * @return NULL on error, otherwise IPC channel 278 */ 279 struct EXTRACTOR_Channel * 280 EXTRACTOR_IPC_channel_create_ (struct EXTRACTOR_PluginList *plugin, 281 struct EXTRACTOR_SharedMemory *shm) 282 { 283 struct EXTRACTOR_Channel *channel; 284 int p1[2]; 285 int p2[2]; 286 pid_t pid; 287 struct InitMessage *init; 288 size_t slen; 289 290 if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel)))) 291 { 292 LOG_STRERROR ("malloc"); 293 return NULL; 294 } 295 channel->mdata_size = 1024; 296 if (NULL == (channel->mdata = malloc (channel->mdata_size))) 297 { 298 LOG_STRERROR ("malloc"); 299 free (channel); 300 return NULL; 301 } 302 channel->shm = shm; 303 channel->plugin = plugin; 304 channel->size = 0; 305 if (0 != pipe (p1)) 306 { 307 LOG_STRERROR ("pipe"); 308 free (channel->mdata); 309 free (channel); 310 return NULL; 311 } 312 if (0 != pipe (p2)) 313 { 314 LOG_STRERROR ("pipe"); 315 (void) close (p1[0]); 316 (void) close (p1[1]); 317 free (channel->mdata); 318 free (channel); 319 return NULL; 320 } 321 pid = fork (); 322 if (pid == -1) 323 { 324 LOG_STRERROR ("fork"); 325 (void) close (p1[0]); 326 (void) close (p1[1]); 327 (void) close (p2[0]); 328 (void) close (p2[1]); 329 free (channel->mdata); 330 free (channel); 331 return NULL; 332 } 333 if (0 == pid) 334 { 335 (void) close (p1[1]); 336 (void) close (p2[0]); 337 free (channel->mdata); 338 free (channel); 339 #if HAVE_SYS_APPARMOR_H 340 #if HAVE_APPARMOR 341 if (0 > aa_change_profile ("libextractor")) 342 { 343 int eno = errno; 344 345 if ( (EINVAL != eno) && 346 (ENOENT != eno) ) 347 { 348 fprintf (stderr, 349 "Failure changing AppArmor profile: %s\n", 350 strerror (errno)); 351 _exit (1); 352 } 353 } 354 #endif 355 #endif 356 EXTRACTOR_plugin_main_ (plugin, p1[0], p2[1]); 357 _exit (0); 358 } 359 (void) close (p1[0]); 360 (void) close (p2[1]); 361 channel->cpipe_in = p1[1]; 362 channel->cpipe_out = p2[0]; 363 channel->cpid = pid; 364 slen = strlen (shm->shm_name) + 1; 365 if (NULL == (init = malloc (sizeof (struct InitMessage) + slen))) 366 { 367 LOG_STRERROR ("malloc"); 368 EXTRACTOR_IPC_channel_destroy_ (channel); 369 return NULL; 370 } 371 init->opcode = MESSAGE_INIT_STATE; 372 init->reserved = 0; 373 init->reserved2 = 0; 374 init->shm_name_length = slen; 375 init->shm_map_size = shm->shm_size; 376 memcpy (&init[1], shm->shm_name, slen); 377 if (sizeof (struct InitMessage) + slen != 378 EXTRACTOR_IPC_channel_send_ (channel, 379 init, 380 sizeof (struct InitMessage) + slen) ) 381 { 382 LOG ("Failed to send INIT_STATE message to plugin\n"); 383 EXTRACTOR_IPC_channel_destroy_ (channel); 384 free (init); 385 return NULL; 386 } 387 free (init); 388 return channel; 389 } 390 391 392 /** 393 * Destroy communication channel with a plugin/process. Also 394 * destroys the process. 395 * 396 * @param channel channel to communicate with the plugin 397 */ 398 void 399 EXTRACTOR_IPC_channel_destroy_ (struct EXTRACTOR_Channel *channel) 400 { 401 int status; 402 403 if (0 != kill (channel->cpid, SIGKILL)) 404 LOG_STRERROR ("kill"); 405 if (-1 == waitpid (channel->cpid, &status, 0)) 406 LOG_STRERROR ("waitpid"); 407 if (0 != close (channel->cpipe_out)) 408 LOG_STRERROR ("close"); 409 if (0 != close (channel->cpipe_in)) 410 LOG_STRERROR ("close"); 411 if (NULL != channel->plugin) 412 channel->plugin->channel = NULL; 413 free (channel->mdata); 414 free (channel); 415 } 416 417 418 /** 419 * Send data via the given IPC channel (blocking). 420 * 421 * @param channel channel to communicate with the plugin 422 * @param buf data to send 423 * @param size number of bytes in buf to send 424 * @return -1 on error, number of bytes sent on success 425 * (never does partial writes) 426 */ 427 ssize_t 428 EXTRACTOR_IPC_channel_send_ (struct EXTRACTOR_Channel *channel, 429 const void *data, 430 size_t size) 431 { 432 const char *cdata = data; 433 size_t off = 0; 434 ssize_t ret; 435 436 while (off < size) 437 { 438 ret = write (channel->cpipe_in, &cdata[off], size - off); 439 if (ret <= 0) 440 { 441 if (-1 == ret) 442 LOG_STRERROR ("write"); 443 return -1; 444 } 445 off += ret; 446 } 447 return size; 448 } 449 450 451 /** 452 * Receive data from any of the given IPC channels (blocking). 453 * Wait for one of the plugins to reply. 454 * Selects on plugin output pipes, runs 'receive_reply' 455 * on each activated pipe until it gets a seek request 456 * or a done message. Called repeatedly by the user until all pipes are dry or 457 * broken. 458 * 459 * @param channels array of channels, channels that break may be set to NULL 460 * @param num_channels length of the @a channels array 461 * @param proc function to call to process messages (may be called 462 * more than once) 463 * @param proc_cls closure for @a proc 464 * @return -1 on error, 1 on success 465 */ 466 int 467 EXTRACTOR_IPC_channel_recv_ (struct EXTRACTOR_Channel **channels, 468 unsigned int num_channels, 469 EXTRACTOR_ChannelMessageProcessor proc, 470 void *proc_cls) 471 { 472 struct timeval tv; 473 fd_set to_check; 474 int max; 475 unsigned int i; 476 struct EXTRACTOR_Channel *channel; 477 ssize_t ret; 478 ssize_t iret; 479 char *ndata; 480 int closed_channel; 481 482 FD_ZERO (&to_check); 483 max = -1; 484 for (i = 0; i<num_channels; i++) 485 { 486 channel = channels[i]; 487 if (NULL == channel) 488 continue; 489 FD_SET (channel->cpipe_out, &to_check); 490 if (max < channel->cpipe_out) 491 max = channel->cpipe_out; 492 } 493 if (-1 == max) 494 { 495 return 1; /* nothing left to do! */ 496 } 497 tv.tv_sec = 0; 498 tv.tv_usec = 500000; /* 500 ms */ 499 if (0 >= select (max + 1, &to_check, NULL, NULL, &tv)) 500 { 501 /* an error or timeout -> something's wrong or all plugins hung up */ 502 closed_channel = 0; 503 for (i = 0; i<num_channels; i++) 504 { 505 channel = channels[i]; 506 if (NULL == channel) 507 continue; 508 if (-1 == channel->plugin->seek_request) 509 { 510 /* plugin blocked for too long, kill channel */ 511 LOG ("Channel blocked, closing channel to %s\n", 512 channel->plugin->libname); 513 channel->plugin->channel = NULL; 514 channel->plugin->round_finished = 1; 515 EXTRACTOR_IPC_channel_destroy_ (channel); 516 channels[i] = NULL; 517 closed_channel = 1; 518 } 519 } 520 if (1 == closed_channel) 521 return 1; 522 /* strange, no channel is to blame, let's die just to be safe */ 523 if ((EINTR != errno) && (0 != errno)) 524 LOG_STRERROR ("select"); 525 return -1; 526 } 527 for (i = 0; i<num_channels; i++) 528 { 529 channel = channels[i]; 530 if (NULL == channel) 531 continue; 532 if (! FD_ISSET (channel->cpipe_out, &to_check)) 533 continue; 534 if (channel->mdata_size == channel->size) 535 { 536 /* not enough space, need to grow allocation (if allowed) */ 537 if (MAX_META_DATA == channel->mdata_size) 538 { 539 LOG ("Inbound message from channel too large, aborting\n"); 540 EXTRACTOR_IPC_channel_destroy_ (channel); 541 channels[i] = NULL; 542 continue; 543 } 544 channel->mdata_size *= 2; 545 if (channel->mdata_size > MAX_META_DATA) 546 channel->mdata_size = MAX_META_DATA; 547 if (NULL == (ndata = realloc (channel->mdata, 548 channel->mdata_size))) 549 { 550 LOG_STRERROR ("realloc"); 551 EXTRACTOR_IPC_channel_destroy_ (channel); 552 channels[i] = NULL; 553 continue; 554 } 555 channel->mdata = ndata; 556 } 557 if ( (-1 == (iret = read (channel->cpipe_out, 558 &channel->mdata[channel->size], 559 channel->mdata_size - channel->size)) ) || 560 (0 == iret) || 561 (-1 == (ret = EXTRACTOR_IPC_process_reply_ (channel->plugin, 562 channel->mdata, 563 channel->size + iret, 564 proc, proc_cls)) ) ) 565 { 566 if (-1 == iret) 567 LOG_STRERROR ("read"); 568 LOG ("Read error from channel, closing channel %s\n", 569 channel->plugin->libname); 570 EXTRACTOR_IPC_channel_destroy_ (channel); 571 channels[i] = NULL; 572 continue; 573 } 574 else 575 { 576 channel->size = channel->size + iret - ret; 577 memmove (channel->mdata, 578 &channel->mdata[ret], 579 channel->size); 580 } 581 } 582 return 1; 583 } 584 585 586 /* end of extractor_ipc_gnu.c */