libextractor

GNU libextractor
Log | Files | Refs | Submodules | README | LICENSE

extractor_ipc_gnu.c (14893B)


      1 /*
      2      This file is part of libextractor.
      3      Copyright (C) 2012 Vidyut Samanta and Christian Grothoff
      4 
      5      libextractor is free software; you can redistribute it and/or modify
      6      it under the terms of the GNU General Public License as published
      7      by the Free Software Foundation; either version 3, or (at your
      8      option) any later version.
      9 
     10      libextractor is distributed in the hope that it will be useful, but
     11      WITHOUT ANY WARRANTY; without even the implied warranty of
     12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     13      General Public License for more details.
     14 
     15      You should have received a copy of the GNU General Public License
     16      along with libextractor; see the file COPYING.  If not, write to the
     17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
     18      Boston, MA 02110-1301, USA.
     19  */
     20 /**
     21  * @file main/extractor_ipc_gnu.c
     22  * @brief IPC with plugin for GNU/POSIX systems
     23  * @author Christian Grothoff
     24  */
     25 #include "platform.h"
     26 #include "extractor.h"
     27 #include "extractor_datasource.h"
     28 #include "extractor_logging.h"
     29 #include "extractor_plugin_main.h"
     30 #include "extractor_plugins.h"
     31 #include "extractor_ipc.h"
     32 #include <dirent.h>
     33 #include <sys/types.h>
     34 #include <sys/wait.h>
     35 #include <sys/shm.h>
     36 #include <signal.h>
     37 #if HAVE_SYS_APPARMOR_H
     38 #include <sys/apparmor.h>
     39 #endif
     40 
     41 /**
     42  * A shared memory resource (often shared with several
     43  * other processes).
     44  */
     45 struct EXTRACTOR_SharedMemory
     46 {
     47   /**
     48    * Pointer to the mapped region of the shm (covers the whole shm)
     49    */
     50   void *shm_ptr;
     51 
     52   /**
     53    * Allocated size of the shm
     54    */
     55   size_t shm_size;
     56 
     57   /**
     58    * POSIX id of the shm into which data is uncompressed
     59    */
     60   int shm_id;
     61 
     62   /**
     63    * Name of the shm
     64    */
     65   char shm_name[MAX_SHM_NAME + 1];
     66 
     67   /**
     68    * Reference counter describing how many references share this SHM.
     69    */
     70   unsigned int rc;
     71 
     72 };
     73 
     74 
     75 /**
     76  * Definition of an IPC communication channel with
     77  * some plugin.
     78  */
     79 struct EXTRACTOR_Channel
     80 {
     81 
     82   /**
     83    * Buffer for reading data from the plugin.
     84    */
     85   char *mdata;
     86 
     87   /**
     88    * Size of the @e mdata buffer.
     89    */
     90   size_t mdata_size;
     91 
     92   /**
     93    * Memory segment shared with this process.
     94    */
     95   struct EXTRACTOR_SharedMemory *shm;
     96 
     97   /**
     98    * The plugin this channel is to communicate with.
     99    */
    100   struct EXTRACTOR_PluginList *plugin;
    101 
    102   /**
    103    * Pipe used to communicate information to the plugin child process.
    104    * NULL if not initialized.
    105    */
    106   int cpipe_in;
    107 
    108   /**
    109    * Number of valid bytes in the channel's buffer.
    110    */
    111   size_t size;
    112 
    113   /**
    114    * Pipe used to read information about extracted meta data from
    115    * the plugin child process.  -1 if not initialized.
    116    */
    117   int cpipe_out;
    118 
    119   /**
    120    * Process ID of the child process for this plugin. 0 for none.
    121    */
    122   pid_t cpid;
    123 
    124 };
    125 
    126 
    127 /**
    128  * Create a shared memory area.
    129  *
    130  * @param size size of the shared area
    131  * @return NULL on error
    132  */
    133 struct EXTRACTOR_SharedMemory *
    134 EXTRACTOR_IPC_shared_memory_create_ (size_t size)
    135 {
    136   struct EXTRACTOR_SharedMemory *shm;
    137   const char *tpath;
    138 
    139   if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory))))
    140   {
    141     LOG_STRERROR ("malloc");
    142     return NULL;
    143   }
    144 #if SOMEBSD
    145   /* this works on FreeBSD, not sure about others... */
    146   tpath = getenv ("TMPDIR");
    147   if (NULL == tpath)
    148     tpath = "/tmp/";
    149 #else
    150   tpath = "/"; /* Linux */
    151 #endif
    152   snprintf (shm->shm_name,
    153             MAX_SHM_NAME,
    154             "%sLE-%u-%u",
    155             tpath,
    156             getpid (),
    157             (unsigned int) random ());
    158   if (-1 == (shm->shm_id = shm_open (shm->shm_name,
    159                                      O_RDWR | O_CREAT,
    160                                      S_IRUSR | S_IWUSR)))
    161   {
    162     LOG_STRERROR_FILE ("shm_open",
    163                        shm->shm_name);
    164     free (shm);
    165     return NULL;
    166   }
    167   if ( (0 != ftruncate (shm->shm_id, size)) ||
    168        (NULL == (shm->shm_ptr = mmap (NULL,
    169                                       size,
    170                                       PROT_WRITE,
    171                                       MAP_SHARED,
    172                                       shm->shm_id,
    173                                       0))) ||
    174        (((void*) -1) == shm->shm_ptr) )
    175   {
    176     LOG_STRERROR ("ftruncate/mmap");
    177     (void) close (shm->shm_id);
    178     (void) shm_unlink (shm->shm_name);
    179     free (shm);
    180     return NULL;
    181   }
    182   shm->shm_size = size;
    183   shm->rc = 0;
    184   return shm;
    185 }
    186 
    187 
    188 /**
    189  * Change the reference counter for this shm instance.
    190  *
    191  * @param shm instance to update
    192  * @param delta value to change RC by
    193  * @return new RC
    194  */
    195 unsigned int
    196 EXTRACTOR_IPC_shared_memory_change_rc_ (struct EXTRACTOR_SharedMemory *shm,
    197                                         int delta)
    198 {
    199   shm->rc += delta;
    200   return shm->rc;
    201 }
    202 
    203 
    204 /**
    205  * Destroy shared memory area.
    206  *
    207  * @param shm memory area to destroy
    208  * @return NULL on error
    209  */
    210 void
    211 EXTRACTOR_IPC_shared_memory_destroy_ (struct EXTRACTOR_SharedMemory *shm)
    212 {
    213   munmap (shm->shm_ptr,
    214           shm->shm_size);
    215   (void) close (shm->shm_id);
    216   (void) shm_unlink (shm->shm_name);
    217   free (shm);
    218 }
    219 
    220 
    221 /**
    222  * Initialize shared memory area from data source.
    223  *
    224  * @param shm memory area to initialize
    225  * @param ds data source to use for initialization
    226  * @param off offset to use in data source
    227  * @param size number of bytes to copy
    228  * @return -1 on error, otherwise number of bytes copied
    229  */
    230 ssize_t
    231 EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm,
    232                                   struct EXTRACTOR_Datasource *ds,
    233                                   uint64_t off,
    234                                   size_t size)
    235 {
    236   if (-1 ==
    237       EXTRACTOR_datasource_seek_ (ds,
    238                                   off,
    239                                   SEEK_SET))
    240   {
    241     LOG ("Failed to set IPC memory due to seek error\n");
    242     return -1;
    243   }
    244   if (size > shm->shm_size)
    245     size = shm->shm_size;
    246   return EXTRACTOR_datasource_read_ (ds,
    247                                      shm->shm_ptr,
    248                                      size);
    249 }
    250 
    251 
    252 /**
    253  * Query datasource for current position
    254  *
    255  * @param ds data source to query
    256  * @return current position in the datasource or UINT_MAX on error
    257  */
    258 uint64_t
    259 EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds)
    260 {
    261   int64_t pos = EXTRACTOR_datasource_seek_ (ds,
    262                                             0,
    263                                             SEEK_CUR);
    264 
    265   if (-1 == pos)
    266     return UINT_MAX;
    267   return pos;
    268 }
    269 
    270 
    271 /**
    272  * Create a channel to communicate with a process wrapping
    273  * the plugin of the given name.  Starts the process as well.
    274  *
    275  * @param plugin the plugin
    276  * @param shm memory to share with the process
    277  * @return NULL on error, otherwise IPC channel
    278  */
    279 struct EXTRACTOR_Channel *
    280 EXTRACTOR_IPC_channel_create_ (struct EXTRACTOR_PluginList *plugin,
    281                                struct EXTRACTOR_SharedMemory *shm)
    282 {
    283   struct EXTRACTOR_Channel *channel;
    284   int p1[2];
    285   int p2[2];
    286   pid_t pid;
    287   struct InitMessage *init;
    288   size_t slen;
    289 
    290   if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel))))
    291   {
    292     LOG_STRERROR ("malloc");
    293     return NULL;
    294   }
    295   channel->mdata_size = 1024;
    296   if (NULL == (channel->mdata = malloc (channel->mdata_size)))
    297   {
    298     LOG_STRERROR ("malloc");
    299     free (channel);
    300     return NULL;
    301   }
    302   channel->shm = shm;
    303   channel->plugin = plugin;
    304   channel->size = 0;
    305   if (0 != pipe (p1))
    306   {
    307     LOG_STRERROR ("pipe");
    308     free (channel->mdata);
    309     free (channel);
    310     return NULL;
    311   }
    312   if (0 != pipe (p2))
    313   {
    314     LOG_STRERROR ("pipe");
    315     (void) close (p1[0]);
    316     (void) close (p1[1]);
    317     free (channel->mdata);
    318     free (channel);
    319     return NULL;
    320   }
    321   pid = fork ();
    322   if (pid == -1)
    323   {
    324     LOG_STRERROR ("fork");
    325     (void) close (p1[0]);
    326     (void) close (p1[1]);
    327     (void) close (p2[0]);
    328     (void) close (p2[1]);
    329     free (channel->mdata);
    330     free (channel);
    331     return NULL;
    332   }
    333   if (0 == pid)
    334   {
    335     (void) close (p1[1]);
    336     (void) close (p2[0]);
    337     free (channel->mdata);
    338     free (channel);
    339 #if HAVE_SYS_APPARMOR_H
    340 #if HAVE_APPARMOR
    341     if (0 > aa_change_profile ("libextractor"))
    342     {
    343       int eno = errno;
    344 
    345       if ( (EINVAL != eno) &&
    346            (ENOENT != eno) )
    347       {
    348         fprintf (stderr,
    349                  "Failure changing AppArmor profile: %s\n",
    350                  strerror (errno));
    351         _exit (1);
    352       }
    353     }
    354 #endif
    355 #endif
    356     EXTRACTOR_plugin_main_ (plugin, p1[0], p2[1]);
    357     _exit (0);
    358   }
    359   (void) close (p1[0]);
    360   (void) close (p2[1]);
    361   channel->cpipe_in = p1[1];
    362   channel->cpipe_out = p2[0];
    363   channel->cpid = pid;
    364   slen = strlen (shm->shm_name) + 1;
    365   if (NULL == (init = malloc (sizeof (struct InitMessage) + slen)))
    366   {
    367     LOG_STRERROR ("malloc");
    368     EXTRACTOR_IPC_channel_destroy_ (channel);
    369     return NULL;
    370   }
    371   init->opcode = MESSAGE_INIT_STATE;
    372   init->reserved = 0;
    373   init->reserved2 = 0;
    374   init->shm_name_length = slen;
    375   init->shm_map_size = shm->shm_size;
    376   memcpy (&init[1], shm->shm_name, slen);
    377   if (sizeof (struct InitMessage) + slen !=
    378       EXTRACTOR_IPC_channel_send_ (channel,
    379                                    init,
    380                                    sizeof (struct InitMessage) + slen) )
    381   {
    382     LOG ("Failed to send INIT_STATE message to plugin\n");
    383     EXTRACTOR_IPC_channel_destroy_ (channel);
    384     free (init);
    385     return NULL;
    386   }
    387   free (init);
    388   return channel;
    389 }
    390 
    391 
    392 /**
    393  * Destroy communication channel with a plugin/process.  Also
    394  * destroys the process.
    395  *
    396  * @param channel channel to communicate with the plugin
    397  */
    398 void
    399 EXTRACTOR_IPC_channel_destroy_ (struct EXTRACTOR_Channel *channel)
    400 {
    401   int status;
    402 
    403   if (0 != kill (channel->cpid, SIGKILL))
    404     LOG_STRERROR ("kill");
    405   if (-1 == waitpid (channel->cpid, &status, 0))
    406     LOG_STRERROR ("waitpid");
    407   if (0 != close (channel->cpipe_out))
    408     LOG_STRERROR ("close");
    409   if (0 != close (channel->cpipe_in))
    410     LOG_STRERROR ("close");
    411   if (NULL != channel->plugin)
    412     channel->plugin->channel = NULL;
    413   free (channel->mdata);
    414   free (channel);
    415 }
    416 
    417 
    418 /**
    419  * Send data via the given IPC channel (blocking).
    420  *
    421  * @param channel channel to communicate with the plugin
    422  * @param buf data to send
    423  * @param size number of bytes in buf to send
    424  * @return -1 on error, number of bytes sent on success
    425  *           (never does partial writes)
    426  */
    427 ssize_t
    428 EXTRACTOR_IPC_channel_send_ (struct EXTRACTOR_Channel *channel,
    429                              const void *data,
    430                              size_t size)
    431 {
    432   const char *cdata = data;
    433   size_t off = 0;
    434   ssize_t ret;
    435 
    436   while (off < size)
    437   {
    438     ret = write (channel->cpipe_in, &cdata[off], size - off);
    439     if (ret <= 0)
    440     {
    441       if (-1 == ret)
    442         LOG_STRERROR ("write");
    443       return -1;
    444     }
    445     off += ret;
    446   }
    447   return size;
    448 }
    449 
    450 
    451 /**
    452  * Receive data from any of the given IPC channels (blocking).
    453  * Wait for one of the plugins to reply.
    454  * Selects on plugin output pipes, runs 'receive_reply'
    455  * on each activated pipe until it gets a seek request
    456  * or a done message. Called repeatedly by the user until all pipes are dry or
    457  * broken.
    458  *
    459  * @param channels array of channels, channels that break may be set to NULL
    460  * @param num_channels length of the @a channels array
    461  * @param proc function to call to process messages (may be called
    462  *             more than once)
    463  * @param proc_cls closure for @a proc
    464  * @return -1 on error, 1 on success
    465  */
    466 int
    467 EXTRACTOR_IPC_channel_recv_ (struct EXTRACTOR_Channel **channels,
    468                              unsigned int num_channels,
    469                              EXTRACTOR_ChannelMessageProcessor proc,
    470                              void *proc_cls)
    471 {
    472   struct timeval tv;
    473   fd_set to_check;
    474   int max;
    475   unsigned int i;
    476   struct EXTRACTOR_Channel *channel;
    477   ssize_t ret;
    478   ssize_t iret;
    479   char *ndata;
    480   int closed_channel;
    481 
    482   FD_ZERO (&to_check);
    483   max = -1;
    484   for (i = 0; i<num_channels; i++)
    485   {
    486     channel = channels[i];
    487     if (NULL == channel)
    488       continue;
    489     FD_SET (channel->cpipe_out, &to_check);
    490     if (max < channel->cpipe_out)
    491       max = channel->cpipe_out;
    492   }
    493   if (-1 == max)
    494   {
    495     return 1;   /* nothing left to do! */
    496   }
    497   tv.tv_sec = 0;
    498   tv.tv_usec = 500000; /* 500 ms */
    499   if (0 >= select (max + 1, &to_check, NULL, NULL, &tv))
    500   {
    501     /* an error or timeout -> something's wrong or all plugins hung up */
    502     closed_channel = 0;
    503     for (i = 0; i<num_channels; i++)
    504     {
    505       channel = channels[i];
    506       if (NULL == channel)
    507         continue;
    508       if (-1 == channel->plugin->seek_request)
    509       {
    510         /* plugin blocked for too long, kill channel */
    511         LOG ("Channel blocked, closing channel to %s\n",
    512              channel->plugin->libname);
    513         channel->plugin->channel = NULL;
    514         channel->plugin->round_finished = 1;
    515         EXTRACTOR_IPC_channel_destroy_ (channel);
    516         channels[i] = NULL;
    517         closed_channel = 1;
    518       }
    519     }
    520     if (1 == closed_channel)
    521       return 1;
    522     /* strange, no channel is to blame, let's die just to be safe */
    523     if ((EINTR != errno) && (0 != errno))
    524       LOG_STRERROR ("select");
    525     return -1;
    526   }
    527   for (i = 0; i<num_channels; i++)
    528   {
    529     channel = channels[i];
    530     if (NULL == channel)
    531       continue;
    532     if (! FD_ISSET (channel->cpipe_out, &to_check))
    533       continue;
    534     if (channel->mdata_size == channel->size)
    535     {
    536       /* not enough space, need to grow allocation (if allowed) */
    537       if (MAX_META_DATA == channel->mdata_size)
    538       {
    539         LOG ("Inbound message from channel too large, aborting\n");
    540         EXTRACTOR_IPC_channel_destroy_ (channel);
    541         channels[i] = NULL;
    542         continue;
    543       }
    544       channel->mdata_size *= 2;
    545       if (channel->mdata_size > MAX_META_DATA)
    546         channel->mdata_size = MAX_META_DATA;
    547       if (NULL == (ndata = realloc (channel->mdata,
    548                                     channel->mdata_size)))
    549       {
    550         LOG_STRERROR ("realloc");
    551         EXTRACTOR_IPC_channel_destroy_ (channel);
    552         channels[i] = NULL;
    553         continue;
    554       }
    555       channel->mdata = ndata;
    556     }
    557     if ( (-1 == (iret = read (channel->cpipe_out,
    558                               &channel->mdata[channel->size],
    559                               channel->mdata_size - channel->size)) ) ||
    560          (0 == iret) ||
    561          (-1 == (ret = EXTRACTOR_IPC_process_reply_ (channel->plugin,
    562                                                      channel->mdata,
    563                                                      channel->size + iret,
    564                                                      proc, proc_cls)) ) )
    565     {
    566       if (-1 == iret)
    567         LOG_STRERROR ("read");
    568       LOG ("Read error from channel, closing channel %s\n",
    569            channel->plugin->libname);
    570       EXTRACTOR_IPC_channel_destroy_ (channel);
    571       channels[i] = NULL;
    572       continue;
    573     }
    574     else
    575     {
    576       channel->size = channel->size + iret - ret;
    577       memmove (channel->mdata,
    578                &channel->mdata[ret],
    579                channel->size);
    580     }
    581   }
    582   return 1;
    583 }
    584 
    585 
    586 /* end of extractor_ipc_gnu.c */