exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

taler-helper-auditor-transfer.c (16869B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2017-2024 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13   You should have received a copy of the GNU General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 /**
     17  * @file auditor/taler-helper-auditor-transfer.c
     18  * @brief audits that deposits past due date are
     19  *    aggregated and have a matching wire transfer
     20  * database.
     21  * @author Christian Grothoff
     22  */
     23 #include "platform.h"
     24 #include <gnunet/gnunet_util_lib.h>
     25 #include <gnunet/gnunet_curl_lib.h>
     26 #include "auditordb_lib.h"
     27 #include "exchangedb_lib.h"
     28 #include "taler/taler_json_lib.h"
     29 #include "report-lib.h"
     30 #include "taler/taler_dbevents.h"
     31 #include "auditor-database/delete_early_aggregation.h"
     32 #include "auditor-database/delete_pending_deposit.h"
     33 #include "auditor-database/event_listen.h"
     34 #include "auditor-database/get_auditor_progress.h"
     35 #include "auditor-database/get_balance.h"
     36 #include "auditor-database/insert_auditor_progress.h"
     37 #include "auditor-database/insert_balance.h"
     38 #include "auditor-database/insert_early_aggregation.h"
     39 #include "auditor-database/insert_pending_deposit.h"
     40 #include "auditor-database/preflight.h"
     41 #include "auditor-database/start.h"
     42 #include "auditor-database/update_auditor_progress.h"
     43 #include "auditor-database/update_balance.h"
     44 #include "exchange-database/preflight.h"
     45 #include "exchange-database/rollback.h"
     46 #include "exchange-database/select_aggregations_above_serial.h"
     47 #include "exchange-database/select_batch_deposits_missing_wire.h"
     48 #include "exchange-database/start_read_only.h"
     49 
     50 
     51 /**
     52  * Run in test mode. Exit when idle instead of
     53  * going to sleep and waiting for more work.
     54  */
     55 static int test_mode;
     56 
     57 /**
     58  * Return value from main().
     59  */
     60 static int global_ret;
     61 
     62 /**
     63  * Last reserve_out / wire_out serial IDs seen.
     64  */
     65 static TALER_ARL_DEF_PP (wire_batch_deposit_id);
     66 static TALER_ARL_DEF_PP (wire_aggregation_id);
     67 
     68 /**
     69  * Total amount which the exchange did not aggregate/transfer in time.
     70  */
     71 static TALER_ARL_DEF_AB (total_amount_lag);
     72 
     73 /**
     74  * Total amount which the exchange did aggregate/transfer too early.
     75  */
     76 static TALER_ARL_DEF_AB (total_early_aggregation);
     77 
     78 /**
     79  * Should we run checks that only work for exchange-internal audits?
     80  */
     81 static int internal_checks;
     82 
     83 /**
     84  * Database event handler to wake us up again.
     85  */
     86 static struct GNUNET_DB_EventHandler *eh;
     87 
     88 /**
     89  * The auditors's configuration.
     90  */
     91 static const struct GNUNET_CONFIGURATION_Handle *cfg;
     92 
     93 
     94 /**
     95  * Task run on shutdown.
     96  *
     97  * @param cls NULL
     98  */
     99 static void
    100 do_shutdown (void *cls)
    101 {
    102   (void) cls;
    103   if (NULL != eh)
    104   {
    105     TALER_AUDITORDB_event_listen_cancel (eh);
    106     eh = NULL;
    107   }
    108   TALER_ARL_done ();
    109   TALER_EXCHANGEDB_unload_accounts ();
    110   TALER_ARL_cfg = NULL;
    111 }
    112 
    113 
    114 /**
    115  * Closure for import_wire_missing_cb().
    116  */
    117 struct ImportMissingWireContext
    118 {
    119   /**
    120    * Set to maximum row ID encountered.
    121    */
    122   uint64_t max_batch_deposit_uuid;
    123 
    124   /**
    125    * Set to database errors in callback.
    126    */
    127   enum GNUNET_DB_QueryStatus err;
    128 };
    129 
    130 
    131 /**
    132  * Function called on deposits that need to be checked for their
    133  * wire transfer.
    134  *
    135  * @param cls closure, points to a `struct ImportMissingWireContext`
    136  * @param batch_deposit_serial_id serial of the entry in the batch deposits table
    137  * @param total_amount value of the missing deposits, including fee
    138  * @param wire_target_h_payto where should the funds be wired
    139  * @param deadline what was the earliest requested wire transfer deadline
    140  */
    141 static void
    142 import_wire_missing_cb (
    143   void *cls,
    144   uint64_t batch_deposit_serial_id,
    145   const struct TALER_Amount *total_amount,
    146   const struct TALER_FullPaytoHashP *wire_target_h_payto,
    147   struct GNUNET_TIME_Timestamp deadline)
    148 {
    149   struct ImportMissingWireContext *wc = cls;
    150   enum GNUNET_DB_QueryStatus qs;
    151 
    152   if (wc->err < 0)
    153     return; /* already failed */
    154   GNUNET_assert (batch_deposit_serial_id >= wc->max_batch_deposit_uuid);
    155   wc->max_batch_deposit_uuid = batch_deposit_serial_id + 1;
    156   qs = TALER_AUDITORDB_delete_early_aggregation (
    157     TALER_ARL_adb,
    158     batch_deposit_serial_id);
    159   switch (qs)
    160   {
    161   case GNUNET_DB_STATUS_SOFT_ERROR:
    162     GNUNET_break (0);
    163     wc->err = qs;
    164     return;
    165   case GNUNET_DB_STATUS_HARD_ERROR:
    166     GNUNET_break (0);
    167     wc->err = qs;
    168     return;
    169   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    170     qs = TALER_AUDITORDB_insert_pending_deposit (
    171       TALER_ARL_adb,
    172       batch_deposit_serial_id,
    173       wire_target_h_payto,
    174       total_amount,
    175       deadline);
    176     if (0 > qs)
    177     {
    178       GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    179       wc->err = qs;
    180     }
    181     TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_amount_lag),
    182                           &TALER_ARL_USE_AB (total_amount_lag),
    183                           total_amount);
    184     break;
    185   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    186     TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_early_aggregation),
    187                                &TALER_ARL_USE_AB (total_early_aggregation),
    188                                total_amount);
    189     break;
    190   default:
    191     GNUNET_assert (0);
    192   }
    193 }
    194 
    195 
    196 /**
    197  * Checks for wire transfers that should have happened.
    198  *
    199  * @return transaction status
    200  */
    201 static enum GNUNET_DB_QueryStatus
    202 check_for_required_transfers (void)
    203 {
    204   enum GNUNET_DB_QueryStatus qs;
    205   struct ImportMissingWireContext wc = {
    206     .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id),
    207     .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
    208   };
    209 
    210   qs = TALER_EXCHANGEDB_select_batch_deposits_missing_wire (
    211     TALER_ARL_edb,
    212     TALER_ARL_USE_PP (wire_batch_deposit_id),
    213     &import_wire_missing_cb,
    214     &wc);
    215   if (0 > qs)
    216   {
    217     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    218     return qs;
    219   }
    220   if (0 > wc.err)
    221   {
    222     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wc.err);
    223     return wc.err;
    224   }
    225   TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid;
    226   return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    227 }
    228 
    229 
    230 /**
    231  * Closure for #clear_finished_transfer_cb().
    232  */
    233 struct AggregationContext
    234 {
    235   /**
    236    * Set to maximum row ID encountered.
    237    */
    238   uint64_t max_aggregation_serial;
    239 
    240   /**
    241    * Set to database errors in callback.
    242    */
    243   enum GNUNET_DB_QueryStatus err;
    244 };
    245 
    246 
    247 /**
    248  * Function called on aggregations that were done for
    249  * a (batch) deposit.
    250  *
    251  * @param cls closure
    252  * @param amount affected amount
    253  * @param tracking_serial_id where in the table are we
    254  * @param batch_deposit_serial_id which batch deposit was aggregated
    255  */
    256 static void
    257 clear_finished_transfer_cb (
    258   void *cls,
    259   const struct TALER_Amount *amount,
    260   uint64_t tracking_serial_id,
    261   uint64_t batch_deposit_serial_id)
    262 {
    263   struct AggregationContext *ac = cls;
    264   enum GNUNET_DB_QueryStatus qs;
    265 
    266   if (0 > ac->err)
    267     return; /* already failed */
    268   GNUNET_assert (ac->max_aggregation_serial <= tracking_serial_id);
    269   ac->max_aggregation_serial = tracking_serial_id + 1;
    270   qs = TALER_AUDITORDB_delete_pending_deposit (
    271     TALER_ARL_adb,
    272     batch_deposit_serial_id);
    273   switch (qs)
    274   {
    275   case GNUNET_DB_STATUS_SOFT_ERROR:
    276     GNUNET_break (0);
    277     ac->err = qs;
    278     return;
    279   case GNUNET_DB_STATUS_HARD_ERROR:
    280     GNUNET_break (0);
    281     ac->err = qs;
    282     return;
    283   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    284     qs = TALER_AUDITORDB_insert_early_aggregation (
    285       TALER_ARL_adb,
    286       batch_deposit_serial_id,
    287       tracking_serial_id,
    288       amount);
    289     if (0 > qs)
    290     {
    291       GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    292       ac->err = qs;
    293       return;
    294     }
    295     TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_early_aggregation),
    296                           &TALER_ARL_USE_AB (total_early_aggregation),
    297                           amount);
    298     break;
    299   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    300     TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_amount_lag),
    301                                &TALER_ARL_USE_AB (total_amount_lag),
    302                                amount);
    303     break;
    304   default:
    305     GNUNET_assert (0);
    306   }
    307 }
    308 
    309 
    310 /**
    311  * Checks that all wire transfers that should have happened
    312  * (based on deposits) have indeed happened.
    313  *
    314  * @return transaction status
    315  */
    316 static enum GNUNET_DB_QueryStatus
    317 check_for_completed_transfers (void)
    318 {
    319   struct AggregationContext ac = {
    320     .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id),
    321     .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
    322   };
    323   enum GNUNET_DB_QueryStatus qs;
    324 
    325   qs = TALER_EXCHANGEDB_select_aggregations_above_serial (
    326     TALER_ARL_edb,
    327     TALER_ARL_USE_PP (wire_aggregation_id),
    328     &clear_finished_transfer_cb,
    329     &ac);
    330   if (0 > qs)
    331   {
    332     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    333     return qs;
    334   }
    335   if (0 > ac.err)
    336   {
    337     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == ac.err);
    338     return ac.err;
    339   }
    340   TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial;
    341   return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    342 }
    343 
    344 
    345 /**
    346  * Start the database transactions and begin the audit.
    347  *
    348  * @return transaction status
    349  */
    350 static enum GNUNET_DB_QueryStatus
    351 begin_transaction (void)
    352 {
    353   enum GNUNET_DB_QueryStatus qs;
    354 
    355   if (GNUNET_SYSERR ==
    356       TALER_EXCHANGEDB_preflight (TALER_ARL_edb))
    357   {
    358     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    359                 "Failed to initialize exchange database connection.\n");
    360     return GNUNET_DB_STATUS_HARD_ERROR;
    361   }
    362   if (GNUNET_SYSERR ==
    363       TALER_AUDITORDB_preflight (TALER_ARL_adb))
    364   {
    365     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    366                 "Failed to initialize auditor database session.\n");
    367     return GNUNET_DB_STATUS_HARD_ERROR;
    368   }
    369   if (GNUNET_OK !=
    370       TALER_AUDITORDB_start (TALER_ARL_adb))
    371   {
    372     GNUNET_break (0);
    373     return GNUNET_DB_STATUS_HARD_ERROR;
    374   }
    375   if (GNUNET_OK !=
    376       TALER_TALER_EXCHANGEDB_start_read_only (TALER_ARL_edb,
    377                                               "transfer auditor"))
    378   {
    379     GNUNET_break (0);
    380     TALER_AUDITORDB_rollback (TALER_ARL_adb);
    381     return GNUNET_DB_STATUS_HARD_ERROR;
    382   }
    383   qs = TALER_AUDITORDB_get_auditor_progress (
    384     TALER_ARL_adb,
    385     TALER_ARL_GET_PP (wire_batch_deposit_id),
    386     TALER_ARL_GET_PP (wire_aggregation_id),
    387     NULL);
    388   if (0 > qs)
    389     goto handle_db_error;
    390 
    391   qs = TALER_AUDITORDB_get_balance (
    392     TALER_ARL_adb,
    393     TALER_ARL_GET_AB (total_amount_lag),
    394     TALER_ARL_GET_AB (total_early_aggregation),
    395     NULL);
    396   if (0 > qs)
    397     goto handle_db_error;
    398   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
    399   {
    400     GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
    401                 "First analysis of with transfer auditor, starting audit from scratch\n");
    402   }
    403   else
    404   {
    405     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    406                 "Resuming transfer audit at %llu / %llu\n",
    407                 (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id),
    408                 (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id));
    409   }
    410 
    411   qs = check_for_required_transfers ();
    412   if (0 > qs)
    413     goto handle_db_error;
    414   qs = check_for_completed_transfers ();
    415   if (0 > qs)
    416     goto handle_db_error;
    417 
    418   qs = TALER_AUDITORDB_update_auditor_progress (
    419     TALER_ARL_adb,
    420     TALER_ARL_SET_PP (wire_batch_deposit_id),
    421     TALER_ARL_SET_PP (wire_aggregation_id),
    422     NULL);
    423   if (0 > qs)
    424     goto handle_db_error;
    425   qs = TALER_AUDITORDB_insert_auditor_progress (
    426     TALER_ARL_adb,
    427     TALER_ARL_SET_PP (wire_batch_deposit_id),
    428     TALER_ARL_SET_PP (wire_aggregation_id),
    429     NULL);
    430   if (0 > qs)
    431     goto handle_db_error;
    432   qs = TALER_AUDITORDB_update_balance (
    433     TALER_ARL_adb,
    434     TALER_ARL_SET_AB (total_amount_lag),
    435     TALER_ARL_SET_AB (total_early_aggregation),
    436     NULL);
    437   if (0 > qs)
    438     goto handle_db_error;
    439   qs = TALER_AUDITORDB_insert_balance (
    440     TALER_ARL_adb,
    441     TALER_ARL_SET_AB (total_amount_lag),
    442     TALER_ARL_SET_AB (total_early_aggregation),
    443     NULL);
    444   if (0 > qs)
    445     goto handle_db_error;
    446   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    447               "Concluded audit step at %llu/%llu\n",
    448               (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id),
    449               (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id));
    450   TALER_EXCHANGEDB_rollback (TALER_ARL_edb);
    451   qs = TALER_AUDITORDB_commit (TALER_ARL_adb);
    452   if (0 > qs)
    453     goto handle_db_error;
    454   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    455               "Transaction concluded!\n");
    456   return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    457 handle_db_error:
    458   TALER_AUDITORDB_rollback (TALER_ARL_adb);
    459   TALER_EXCHANGEDB_rollback (TALER_ARL_edb);
    460   GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    461   return qs;
    462 }
    463 
    464 
    465 /**
    466  * Start auditor process.
    467  */
    468 static void
    469 start (void)
    470 {
    471   enum GNUNET_DB_QueryStatus qs;
    472 
    473   for (unsigned int max_retries = 3; max_retries>0; max_retries--)
    474   {
    475     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    476                 "Trying again (%u attempts left)\n",
    477                 max_retries);
    478     qs = begin_transaction ();
    479     if (GNUNET_DB_STATUS_SOFT_ERROR != qs)
    480       break;
    481   }
    482   if (0 > qs)
    483   {
    484     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    485                 "Audit failed\n");
    486     GNUNET_break (0);
    487     global_ret = EXIT_FAILURE;
    488     GNUNET_SCHEDULER_shutdown ();
    489     return;
    490   }
    491 }
    492 
    493 
    494 /**
    495  * Function called on events received from Postgres.
    496  *
    497  * @param cls closure, NULL
    498  * @param extra additional event data provided
    499  * @param extra_size number of bytes in @a extra
    500  */
    501 static void
    502 db_notify (void *cls,
    503            const void *extra,
    504            size_t extra_size)
    505 {
    506   (void) cls;
    507   (void) extra;
    508   (void) extra_size;
    509 
    510   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    511               "Received notification to wake transfer helper\n");
    512   start ();
    513 }
    514 
    515 
    516 /**
    517  * Main function that will be run.
    518  *
    519  * @param cls closure
    520  * @param args remaining command-line arguments
    521  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
    522  * @param c configuration
    523  */
    524 static void
    525 run (void *cls,
    526      char *const *args,
    527      const char *cfgfile,
    528      const struct GNUNET_CONFIGURATION_Handle *c)
    529 {
    530   (void) cls;
    531   (void) args;
    532   (void) cfgfile;
    533   cfg = c;
    534   if (GNUNET_OK !=
    535       TALER_ARL_init (c))
    536   {
    537     global_ret = EXIT_FAILURE;
    538     return;
    539   }
    540   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
    541                                  NULL);
    542   if (GNUNET_OK !=
    543       TALER_EXCHANGEDB_load_accounts (TALER_ARL_cfg,
    544                                       TALER_EXCHANGEDB_ALO_DEBIT
    545                                       | TALER_EXCHANGEDB_ALO_CREDIT
    546                                       | TALER_EXCHANGEDB_ALO_AUTHDATA))
    547   {
    548     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    549                 "No bank accounts configured\n");
    550     global_ret = EXIT_NOTCONFIGURED;
    551     GNUNET_SCHEDULER_shutdown ();
    552     return;
    553   }
    554   if (0 == test_mode)
    555   {
    556     // FIXME-Optimization: use different event type in the future!
    557     struct GNUNET_DB_EventHeaderP es = {
    558       .size = htons (sizeof (es)),
    559       .type = htons (TALER_DBEVENT_EXCHANGE_AUDITOR_WAKE_HELPER_WIRE)
    560     };
    561 
    562     eh = TALER_AUDITORDB_event_listen (TALER_ARL_adb,
    563                                        &es,
    564                                        GNUNET_TIME_UNIT_FOREVER_REL,
    565                                        &db_notify,
    566                                        NULL);
    567     GNUNET_assert (NULL != eh);
    568   }
    569   start ();
    570 }
    571 
    572 
    573 /**
    574  * The main function of the wire auditing tool. Checks that
    575  * the exchange's records of wire transfers match that of
    576  * the wire gateway.
    577  *
    578  * @param argc number of arguments from the command line
    579  * @param argv command line arguments
    580  * @return 0 ok, 1 on error
    581  */
    582 int
    583 main (int argc,
    584       char *const *argv)
    585 {
    586   const struct GNUNET_GETOPT_CommandLineOption options[] = {
    587     GNUNET_GETOPT_option_flag ('i',
    588                                "internal",
    589                                "perform checks only applicable for exchange-internal audits",
    590                                &internal_checks),
    591     GNUNET_GETOPT_option_flag ('t',
    592                                "test",
    593                                "run in test mode and exit when idle",
    594                                &test_mode),
    595     GNUNET_GETOPT_option_timetravel ('T',
    596                                      "timetravel"),
    597     GNUNET_GETOPT_OPTION_END
    598   };
    599   enum GNUNET_GenericReturnValue ret;
    600 
    601   ret = GNUNET_PROGRAM_run (
    602     TALER_AUDITOR_project_data (),
    603     argc,
    604     argv,
    605     "taler-helper-auditor-transfer",
    606     gettext_noop (
    607       "Audit exchange database for consistency of aggregations/transfers with respect to deposit deadlines"),
    608     options,
    609     &run,
    610     NULL);
    611   if (GNUNET_SYSERR == ret)
    612     return EXIT_INVALIDARGUMENT;
    613   if (GNUNET_NO == ret)
    614     return EXIT_SUCCESS;
    615   return global_ret;
    616 }
    617 
    618 
    619 /* end of taler-helper-auditor-transfer.c */