exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

reserves_in_insert.c (11808B)


      1 /*
      2    This file is part of TALER
      3    Copyright (C) 2022-2024 Taler Systems SA
      4 
      5    TALER is free software; you can redistribute it and/or modify it under the
      6    terms of the GNU General Public License as published by the Free Software
      7    Foundation; either version 3, or (at your option) any later version.
      8 
      9    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13    You should have received a copy of the GNU General Public License along with
     14    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15  */
     16 /**
     17  * @file exchangedb/reserves_in_insert.c
     18  * @brief Implementation of the reserves_in_insert function for Postgres
     19  * @author Christian Grothoff
     20  * @author Joseph Xu
     21  */
     22 #include "taler/taler_pq_lib.h"
     23 #include "exchange-database/reserves_in_insert.h"
     24 #include "helper.h"
     25 #include "exchange-database/start.h"
     26 #include "exchange-database/start_read_committed.h"
     27 #include "exchange-database/commit.h"
     28 #include "exchange-database/preflight.h"
     29 #include "exchange-database/rollback.h"
     30 
     31 
     32 /**
     33  * Generate event notification for the reserve change.
     34  *
     35  * @param reserve_pub reserve to notfiy on
     36  * @return string to pass to postgres for the notification
     37  */
     38 static char *
     39 compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
     40 {
     41   struct TALER_EXCHANGEDB_ReserveEventP rep = {
     42     .header.size = htons (sizeof (rep)),
     43     .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
     44     .reserve_pub = *reserve_pub
     45   };
     46 
     47   return GNUNET_PQ_get_event_notify_channel (&rep.header);
     48 }
     49 
     50 
     51 /**
     52  * Closure for our helper_cb()
     53  */
     54 struct Context
     55 {
     56   /**
     57    * Array of reserve UUIDs to initialize.
     58    */
     59   uint64_t *reserve_uuids;
     60 
     61   /**
     62    * Array with entries set to 'true' for duplicate transactions.
     63    */
     64   bool *transaction_duplicates;
     65 
     66   /**
     67    * Array with entries set to 'true' for rows with conflicts.
     68    */
     69   bool *conflicts;
     70 
     71   /**
     72    * Set to #GNUNET_SYSERR on failures.
     73    */
     74   enum GNUNET_GenericReturnValue status;
     75 
     76   /**
     77    * Single value (no array) set to true if we need
     78    * to follow-up with an update.
     79    */
     80   bool needs_update;
     81 };
     82 
     83 
     84 /**
     85  * Helper function to be called with the results of a SELECT statement
     86  * that has returned @a num_results results.
     87  *
     88  * @param cls closure of type `struct Context *`
     89  * @param result the postgres result
     90  * @param num_results the number of results in @a result
     91  */
     92 static void
     93 helper_cb (void *cls,
     94            PGresult *result,
     95            unsigned int num_results)
     96 {
     97   struct Context *ctx = cls;
     98 
     99   for (unsigned int i = 0; i<num_results; i++)
    100   {
    101     struct GNUNET_PQ_ResultSpec rs[] = {
    102       GNUNET_PQ_result_spec_bool (
    103         "transaction_duplicate",
    104         &ctx->transaction_duplicates[i]),
    105       GNUNET_PQ_result_spec_allow_null (
    106         GNUNET_PQ_result_spec_uint64 ("ruuid",
    107                                       &ctx->reserve_uuids[i]),
    108         &ctx->conflicts[i]),
    109       GNUNET_PQ_result_spec_end
    110     };
    111 
    112     if (GNUNET_OK !=
    113         GNUNET_PQ_extract_result (result,
    114                                   rs,
    115                                   i))
    116     {
    117       GNUNET_break (0);
    118       ctx->status = GNUNET_SYSERR;
    119       return;
    120     }
    121     if (! ctx->transaction_duplicates[i])
    122       ctx->needs_update |= ctx->conflicts[i];
    123   }
    124 }
    125 
    126 
    127 enum GNUNET_DB_QueryStatus
    128 TALER_EXCHANGEDB_reserves_in_insert (
    129   struct TALER_EXCHANGEDB_PostgresContext *pg,
    130   const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
    131   unsigned int reserves_length,
    132   enum GNUNET_DB_QueryStatus *results)
    133 {
    134   unsigned int dups = 0;
    135 
    136   struct TALER_FullPaytoHashP h_full_paytos[
    137     GNUNET_NZL (reserves_length)];
    138   struct TALER_NormalizedPaytoHashP h_normalized_paytos[
    139     GNUNET_NZL (reserves_length)];
    140   char *notify_s[GNUNET_NZL (reserves_length)];
    141   struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
    142   struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
    143   struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
    144   const char *sender_account_details[GNUNET_NZL (reserves_length)];
    145   const char *exchange_account_names[GNUNET_NZL (reserves_length)];
    146   uint64_t wire_references[GNUNET_NZL (reserves_length)];
    147   uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
    148   bool transaction_duplicates[GNUNET_NZL (reserves_length)];
    149   bool conflicts[GNUNET_NZL (reserves_length)];
    150   struct GNUNET_TIME_Timestamp reserve_expiration
    151     = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
    152   struct GNUNET_TIME_Timestamp gc
    153     = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
    154   enum GNUNET_DB_QueryStatus qs;
    155   bool need_update;
    156 
    157   for (unsigned int i = 0; i<reserves_length; i++)
    158   {
    159     const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
    160 
    161     TALER_full_payto_hash (reserve->sender_account_details,
    162                            &h_full_paytos[i]);
    163     TALER_full_payto_normalize_and_hash (reserve->sender_account_details,
    164                                          &h_normalized_paytos[i]);
    165     notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
    166     reserve_pubs[i] = *reserve->reserve_pub;
    167     balances[i] = *reserve->balance;
    168     execution_times[i] = reserve->execution_time;
    169     sender_account_details[i] = reserve->sender_account_details.full_payto;
    170     exchange_account_names[i] = reserve->exchange_account_name;
    171     wire_references[i] = reserve->wire_reference;
    172   }
    173 
    174   /* NOTE: kind-of pointless to explicitly start a transaction here... */
    175   if (GNUNET_OK !=
    176       TALER_EXCHANGEDB_preflight (pg))
    177   {
    178     GNUNET_break (0);
    179     qs = GNUNET_DB_STATUS_HARD_ERROR;
    180     goto finished;
    181   }
    182   if (GNUNET_OK !=
    183       TALER_TALER_EXCHANGEDB_start_read_committed (pg,
    184                                                    "READ_COMMITED"))
    185   {
    186     GNUNET_break (0);
    187     qs = GNUNET_DB_STATUS_HARD_ERROR;
    188     goto finished;
    189   }
    190   PREPARE (pg,
    191            "reserves_insert_with_array",
    192            "SELECT"
    193            " transaction_duplicate"
    194            ",ruuid"
    195            " FROM exchange_do_array_reserves_insert"
    196            " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
    197   {
    198     struct GNUNET_PQ_QueryParam params[] = {
    199       GNUNET_PQ_query_param_timestamp (&gc),
    200       GNUNET_PQ_query_param_timestamp (&reserve_expiration),
    201       GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
    202                                                   reserve_pubs,
    203                                                   pg->conn),
    204       GNUNET_PQ_query_param_array_uint64 (reserves_length,
    205                                           wire_references,
    206                                           pg->conn),
    207       TALER_PQ_query_param_array_amount (
    208         reserves_length,
    209         balances,
    210         pg->conn),
    211       GNUNET_PQ_query_param_array_ptrs_string (
    212         reserves_length,
    213         (const char **) exchange_account_names,
    214         pg->conn),
    215       GNUNET_PQ_query_param_array_timestamp (
    216         reserves_length,
    217         execution_times,
    218         pg->conn),
    219       GNUNET_PQ_query_param_array_auto_from_type (
    220         reserves_length,
    221         h_full_paytos,
    222         pg->conn),
    223       GNUNET_PQ_query_param_array_auto_from_type (
    224         reserves_length,
    225         h_normalized_paytos,
    226         pg->conn),
    227       GNUNET_PQ_query_param_array_ptrs_string (
    228         reserves_length,
    229         (const char **) sender_account_details,
    230         pg->conn),
    231       GNUNET_PQ_query_param_array_ptrs_string (
    232         reserves_length,
    233         (const char **) notify_s,
    234         pg->conn),
    235       GNUNET_PQ_query_param_end
    236     };
    237     struct Context ctx = {
    238       .reserve_uuids = reserve_uuids,
    239       .transaction_duplicates = transaction_duplicates,
    240       .conflicts = conflicts,
    241       .needs_update = false,
    242       .status = GNUNET_OK
    243     };
    244 
    245     qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
    246                                                "reserves_insert_with_array",
    247                                                params,
    248                                                &helper_cb,
    249                                                &ctx);
    250     GNUNET_PQ_cleanup_query_params_closures (params);
    251     if ( (qs < 0) ||
    252          (GNUNET_OK != ctx.status) )
    253     {
    254       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    255                   "Failed to insert into reserves (%d)\n",
    256                   qs);
    257       goto finished;
    258     }
    259     need_update = ctx.needs_update;
    260   }
    261 
    262   {
    263     enum GNUNET_DB_QueryStatus cs;
    264 
    265     cs = TALER_EXCHANGEDB_commit (pg);
    266     if (cs < 0)
    267     {
    268       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    269                   "Failed to commit\n");
    270       qs = cs;
    271       goto finished;
    272     }
    273   }
    274 
    275   for (unsigned int i = 0; i<reserves_length; i++)
    276   {
    277     if (transaction_duplicates[i])
    278       dups++;
    279     results[i] = transaction_duplicates[i]
    280       ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
    281       : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    282   }
    283 
    284   if (! need_update)
    285   {
    286     qs = reserves_length;
    287     goto finished;
    288   }
    289   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    290               "Reserve update needed for some reserves in the batch\n");
    291   PREPARE (pg,
    292            "reserves_update",
    293            "SELECT"
    294            " out_duplicate AS duplicate "
    295            "FROM exchange_do_batch_reserves_update"
    296            " ($1,$2,$3,$4,$5,$6,$7);");
    297 
    298   if (GNUNET_OK !=
    299       TALER_EXCHANGEDB_start (pg,
    300                               "reserve-insert-continued"))
    301   {
    302     GNUNET_break (0);
    303     qs = GNUNET_DB_STATUS_HARD_ERROR;
    304     goto finished;
    305   }
    306 
    307   for (unsigned int i = 0; i<reserves_length; i++)
    308   {
    309     if (transaction_duplicates[i])
    310       continue;
    311     if (! conflicts[i])
    312       continue;
    313     {
    314       bool duplicate;
    315       struct GNUNET_PQ_QueryParam params[] = {
    316         GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
    317         GNUNET_PQ_query_param_timestamp (&reserve_expiration),
    318         GNUNET_PQ_query_param_uint64 (&wire_references[i]),
    319         TALER_PQ_query_param_amount (pg->conn,
    320                                      &balances[i]),
    321         GNUNET_PQ_query_param_string (exchange_account_names[i]),
    322         GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]),
    323         GNUNET_PQ_query_param_string (notify_s[i]),
    324         GNUNET_PQ_query_param_end
    325       };
    326       struct GNUNET_PQ_ResultSpec rs[] = {
    327         GNUNET_PQ_result_spec_bool ("duplicate",
    328                                     &duplicate),
    329         GNUNET_PQ_result_spec_end
    330       };
    331       enum GNUNET_DB_QueryStatus qsi;
    332 
    333       qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
    334                                                       "reserves_update",
    335                                                       params,
    336                                                       rs);
    337       if (qsi < 0)
    338       {
    339         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    340                     "Failed to update reserves (%d)\n",
    341                     qsi);
    342         results[i] = qsi;
    343         goto finished;
    344       }
    345       results[i] = duplicate
    346           ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
    347           : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    348     }
    349   }
    350   {
    351     enum GNUNET_DB_QueryStatus cs;
    352 
    353     cs = TALER_EXCHANGEDB_commit (pg);
    354     if (cs < 0)
    355     {
    356       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    357                   "Failed to commit\n");
    358       qs = cs;
    359       goto finished;
    360     }
    361   }
    362 finished:
    363   for (unsigned int i = 0; i<reserves_length; i++)
    364     GNUNET_free (notify_s[i]);
    365   if (qs < 0)
    366     return qs;
    367   GNUNET_PQ_event_do_poll (pg->conn);
    368   if (0 != dups)
    369     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    370                 "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
    371                 dups,
    372                 reserves_length);
    373   return qs;
    374 }