exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

aggregate.c (9521B)


      1 /*
      2    This file is part of TALER
      3    Copyright (C) 2022, 2023 Taler Systems SA
      4 
      5    TALER is free software; you can redistribute it and/or modify it under the
      6    terms of the GNU General Public License as published by the Free Software
      7    Foundation; either version 3, or (at your option) any later version.
      8 
      9    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13    You should have received a copy of the GNU General Public License along with
     14    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15  */
     16 /**
     17  * @file exchangedb/aggregate.c
     18  * @brief Implementation of the aggregate function for Postgres
     19  * @author Christian Grothoff
     20  */
     21 #include "taler/taler_error_codes.h"
     22 #include "taler/taler_pq_lib.h"
     23 #include "exchange-database/compute_shard.h"
     24 #include "exchange-database/aggregate.h"
     25 #include "helper.h"
     26 
     27 
     28 enum GNUNET_DB_QueryStatus
     29 TALER_EXCHANGEDB_aggregate (
     30   struct TALER_EXCHANGEDB_PostgresContext *pg,
     31   const struct TALER_FullPaytoHashP *h_payto,
     32   const struct TALER_MerchantPublicKeyP *merchant_pub,
     33   const struct TALER_WireTransferIdentifierRawP *wtid,
     34   struct TALER_Amount *total)
     35 {
     36   uint64_t deposit_shard = TALER_EXCHANGEDB_compute_shard (merchant_pub);
     37   struct GNUNET_TIME_Absolute now = {0};
     38   uint64_t sum_deposit_value;
     39   uint64_t sum_deposit_frac;
     40   uint64_t sum_refund_value;
     41   uint64_t sum_refund_frac;
     42   uint64_t sum_fee_value;
     43   uint64_t sum_fee_frac;
     44   enum GNUNET_DB_QueryStatus qs;
     45   struct TALER_Amount sum_deposit;
     46   struct TALER_Amount sum_refund;
     47   struct TALER_Amount sum_fee;
     48   struct TALER_Amount delta;
     49 
     50   now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
     51                                          pg->aggregator_shift);
     52   PREPARE (pg,
     53            "aggregate",
     54            "WITH bdep AS (" /* restrict to our merchant and account and mark as done */
     55            "  UPDATE batch_deposits"
     56            "     SET done=TRUE"
     57            "   WHERE NOT (done OR policy_blocked)" /* only actually executable deposits */
     58            "     AND refund_deadline<$1"
     59            "     AND shard=$5" /* only for efficiency, merchant_pub is what we really filter by */
     60            "     AND merchant_pub=$2" /* filter by target merchant */
     61            "     AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */
     62            "   RETURNING"
     63            "     batch_deposit_serial_id)"
     64            " ,cdep AS ("
     65            "   SELECT"
     66            "     coin_deposit_serial_id"
     67            "    ,batch_deposit_serial_id"
     68            "    ,coin_pub"
     69            "    ,amount_with_fee AS amount"
     70            "   FROM coin_deposits"
     71            "   WHERE batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
     72            " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
     73            "  SELECT"
     74            "    amount_with_fee AS refund"
     75            "   ,coin_pub"
     76            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
     77            "    FROM refunds"
     78            "   WHERE coin_pub IN (SELECT coin_pub FROM cdep)"
     79            "     AND batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
     80            " ,ref_by_coin AS (" /* total up refunds by coin */
     81            "  SELECT"
     82            "    SUM((ref.refund).val) AS sum_refund_val"
     83            "   ,SUM((ref.refund).frac) AS sum_refund_frac"
     84            "   ,coin_pub"
     85            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
     86            "    FROM ref"
     87            "   GROUP BY coin_pub, batch_deposit_serial_id)"
     88            " ,norm_ref_by_coin AS (" /* normalize */
     89            "  SELECT"
     90            "    sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val"
     91            "   ,sum_refund_frac % 100000000 AS norm_refund_frac"
     92            "   ,coin_pub"
     93            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
     94            "    FROM ref_by_coin)"
     95            " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
     96            "  SELECT"
     97            "    cdep.coin_pub"
     98            "    FROM norm_ref_by_coin norm"
     99            "    JOIN cdep"
    100            "      ON (norm.coin_pub = cdep.coin_pub"
    101            "      AND norm.batch_deposit_serial_id = cdep.batch_deposit_serial_id"
    102            "      AND norm.norm_refund_val = (cdep.amount).val"
    103            "      AND norm.norm_refund_frac = (cdep.amount).frac))"
    104            " ,fees AS (" /* find deposit fees for not fully refunded deposits */
    105            "  SELECT"
    106            "    denom.fee_deposit AS fee"
    107            "   ,cs.batch_deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */
    108            "    FROM cdep cs"
    109            "    JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
    110            "      USING (coin_pub)"
    111            "    JOIN denominations denom"
    112            "      USING (denominations_serial)"
    113            "    WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))"
    114            " ,dummy AS (" /* add deposits to aggregation_tracking */
    115            "    INSERT INTO aggregation_tracking"
    116            "    (batch_deposit_serial_id"
    117            "    ,wtid_raw)"
    118            "    SELECT batch_deposit_serial_id,$4"
    119            "      FROM bdep)"
    120            "SELECT" /* calculate totals (deposits, refunds and fees) */
    121            "  CAST(COALESCE(SUM((cdep.amount).val),0) AS INT8) AS sum_deposit_value"
    122            /* cast needed, otherwise we get NUMBER */
    123            " ,COALESCE(SUM((cdep.amount).frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */
    124            " ,CAST(COALESCE(SUM((ref.refund).val),0) AS INT8) AS sum_refund_value"
    125            " ,COALESCE(SUM((ref.refund).frac),0) AS sum_refund_fraction"
    126            " ,CAST(COALESCE(SUM((fees.fee).val),0) AS INT8) AS sum_fee_value"
    127            " ,COALESCE(SUM((fees.fee).frac),0) AS sum_fee_fraction"
    128            " FROM cdep "
    129            "   FULL OUTER JOIN ref ON (FALSE)"    /* We just want all sums */
    130            "   FULL OUTER JOIN fees ON (FALSE);");
    131 
    132   {
    133     struct GNUNET_PQ_QueryParam params[] = {
    134       GNUNET_PQ_query_param_absolute_time (&now),
    135       GNUNET_PQ_query_param_auto_from_type (merchant_pub),
    136       GNUNET_PQ_query_param_auto_from_type (h_payto),
    137       GNUNET_PQ_query_param_auto_from_type (wtid),
    138       GNUNET_PQ_query_param_uint64 (&deposit_shard),
    139       GNUNET_PQ_query_param_end
    140     };
    141     struct GNUNET_PQ_ResultSpec rs[] = {
    142       GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
    143                                     &sum_deposit_value),
    144       GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
    145                                     &sum_deposit_frac),
    146       GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
    147                                     &sum_refund_value),
    148       GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
    149                                     &sum_refund_frac),
    150       GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
    151                                     &sum_fee_value),
    152       GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
    153                                     &sum_fee_frac),
    154       GNUNET_PQ_result_spec_end
    155     };
    156 
    157     qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
    158                                                    "aggregate",
    159                                                    params,
    160                                                    rs);
    161   }
    162   if (qs < 0)
    163   {
    164     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    165     return qs;
    166   }
    167   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
    168   {
    169     GNUNET_assert (GNUNET_OK ==
    170                    TALER_amount_set_zero (pg->currency,
    171                                           total));
    172     return qs;
    173   }
    174   GNUNET_assert (GNUNET_OK ==
    175                  TALER_amount_set_zero (pg->currency,
    176                                         &sum_deposit));
    177   GNUNET_assert (GNUNET_OK ==
    178                  TALER_amount_set_zero (pg->currency,
    179                                         &sum_refund));
    180   GNUNET_assert (GNUNET_OK ==
    181                  TALER_amount_set_zero (pg->currency,
    182                                         &sum_fee));
    183   sum_deposit.value    = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
    184                          + sum_deposit_value;
    185   sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
    186   sum_refund.value     = sum_refund_frac  / TALER_AMOUNT_FRAC_BASE
    187                          + sum_refund_value;
    188   sum_refund.fraction  = sum_refund_frac  % TALER_AMOUNT_FRAC_BASE;
    189   sum_fee.value        = sum_fee_frac     / TALER_AMOUNT_FRAC_BASE
    190                          + sum_fee_value;
    191   sum_fee.fraction     = sum_fee_frac     % TALER_AMOUNT_FRAC_BASE; \
    192   GNUNET_assert (0 <=
    193                  TALER_amount_subtract (&delta,
    194                                         &sum_deposit,
    195                                         &sum_refund));
    196   GNUNET_assert (0 <=
    197                  TALER_amount_subtract (total,
    198                                         &delta,
    199                                         &sum_fee));
    200   return qs;
    201 }