exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

begin_shard.c (9218B)


      1 /*
      2    This file is part of TALER
      3    Copyright (C) 2022 Taler Systems SA
      4 
      5    TALER is free software; you can redistribute it and/or modify it under the
      6    terms of the GNU General Public License as published by the Free Software
      7    Foundation; either version 3, or (at your option) any later version.
      8 
      9    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13    You should have received a copy of the GNU General Public License along with
     14    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15  */
     16 /**
     17  * @file exchangedb/begin_shard.c
     18  * @brief Implementation of the begin_shard function for Postgres
     19  * @author Christian Grothoff
     20  */
     21 #include "taler/taler_pq_lib.h"
     22 #include "exchange-database/begin_shard.h"
     23 #include "helper.h"
     24 #include "exchange-database/start.h"
     25 #include "exchange-database/rollback.h"
     26 #include "exchange-database/commit.h"
     27 
     28 
     29 enum GNUNET_DB_QueryStatus
     30 TALER_EXCHANGEDB_begin_shard (struct TALER_EXCHANGEDB_PostgresContext *pg,
     31                               const char *job_name,
     32                               struct GNUNET_TIME_Relative delay,
     33                               uint64_t shard_size,
     34                               uint64_t *start_row,
     35                               uint64_t *end_row)
     36 {
     37 
     38   for (unsigned int retries = 0; retries<10; retries++)
     39   {
     40     if (GNUNET_OK !=
     41         TALER_EXCHANGEDB_start (pg,
     42                                 "begin_shard"))
     43     {
     44       GNUNET_break (0);
     45       return GNUNET_DB_STATUS_HARD_ERROR;
     46     }
     47 
     48     {
     49       struct GNUNET_TIME_Absolute past;
     50       enum GNUNET_DB_QueryStatus qs;
     51       struct GNUNET_PQ_QueryParam params[] = {
     52         GNUNET_PQ_query_param_string (job_name),
     53         GNUNET_PQ_query_param_absolute_time (&past),
     54         GNUNET_PQ_query_param_end
     55       };
     56       struct GNUNET_PQ_ResultSpec rs[] = {
     57         GNUNET_PQ_result_spec_uint64 ("start_row",
     58                                       start_row),
     59         GNUNET_PQ_result_spec_uint64 ("end_row",
     60                                       end_row),
     61         GNUNET_PQ_result_spec_end
     62       };
     63 
     64       past = GNUNET_TIME_absolute_get ();
     65       PREPARE (pg,
     66                "get_open_shard",
     67                "SELECT"
     68                " start_row"
     69                ",end_row"
     70                " FROM work_shards"
     71                " WHERE job_name=$1"
     72                "   AND completed=FALSE"
     73                "   AND last_attempt<$2"
     74                " ORDER BY last_attempt ASC"
     75                " LIMIT 1;");
     76       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     77                                                      "get_open_shard",
     78                                                      params,
     79                                                      rs);
     80       switch (qs)
     81       {
     82       case GNUNET_DB_STATUS_HARD_ERROR:
     83         GNUNET_break (0);
     84         TALER_EXCHANGEDB_rollback (pg);
     85         return qs;
     86       case GNUNET_DB_STATUS_SOFT_ERROR:
     87         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     88                     "Serialization error on getting open shard\n");
     89         TALER_EXCHANGEDB_rollback (pg);
     90         continue;
     91       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     92         {
     93           enum GNUNET_DB_QueryStatus qsz;
     94           struct GNUNET_TIME_Absolute now;
     95           struct GNUNET_PQ_QueryParam iparams[] = {
     96             GNUNET_PQ_query_param_string (job_name),
     97             GNUNET_PQ_query_param_absolute_time (&now),
     98             GNUNET_PQ_query_param_uint64 (start_row),
     99             GNUNET_PQ_query_param_uint64 (end_row),
    100             GNUNET_PQ_query_param_end
    101           };
    102 
    103           now = GNUNET_TIME_relative_to_absolute (delay);
    104           PREPARE (pg,
    105                    "reclaim_shard",
    106                    "UPDATE work_shards"
    107                    " SET last_attempt=$2"
    108                    " WHERE job_name=$1"
    109                    "   AND start_row=$3"
    110                    "   AND end_row=$4");
    111           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
    112                                                     "reclaim_shard",
    113                                                     iparams);
    114           switch (qsz)
    115           {
    116           case GNUNET_DB_STATUS_HARD_ERROR:
    117             GNUNET_break (0);
    118             TALER_EXCHANGEDB_rollback (pg);
    119             return qsz;
    120           case GNUNET_DB_STATUS_SOFT_ERROR:
    121             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    122                         "Serialization error on claiming open shard\n");
    123             TALER_EXCHANGEDB_rollback (pg);
    124             continue;
    125           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    126             goto commit;
    127           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    128             GNUNET_break (0); /* logic error, should be impossible */
    129             TALER_EXCHANGEDB_rollback (pg);
    130             return GNUNET_DB_STATUS_HARD_ERROR;
    131           }
    132         }
    133         break; /* actually unreachable */
    134       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    135         break; /* continued below */
    136       }
    137     } /* get_open_shard */
    138 
    139     /* No open shard, find last 'end_row' */
    140     {
    141       enum GNUNET_DB_QueryStatus qs;
    142       struct GNUNET_PQ_QueryParam params[] = {
    143         GNUNET_PQ_query_param_string (job_name),
    144         GNUNET_PQ_query_param_end
    145       };
    146       struct GNUNET_PQ_ResultSpec rs[] = {
    147         GNUNET_PQ_result_spec_uint64 ("end_row",
    148                                       start_row),
    149         GNUNET_PQ_result_spec_end
    150       };
    151 
    152       PREPARE (pg,
    153                "get_last_shard",
    154                "SELECT"
    155                " end_row"
    156                " FROM work_shards"
    157                " WHERE job_name=$1"
    158                " ORDER BY end_row DESC"
    159                " LIMIT 1;");
    160       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
    161                                                      "get_last_shard",
    162                                                      params,
    163                                                      rs);
    164       switch (qs)
    165       {
    166       case GNUNET_DB_STATUS_HARD_ERROR:
    167         GNUNET_break (0);
    168         TALER_EXCHANGEDB_rollback (pg);
    169         return qs;
    170       case GNUNET_DB_STATUS_SOFT_ERROR:
    171         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    172                     "Serialization error on getting last shard\n");
    173         TALER_EXCHANGEDB_rollback (pg);
    174         continue;
    175       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    176         break;
    177       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    178         *start_row = 0; /* base-case: no shards yet */
    179         break; /* continued below */
    180       }
    181       *end_row = *start_row + shard_size;
    182     } /* get_last_shard */
    183 
    184     /* Claim fresh shard */
    185     {
    186       enum GNUNET_DB_QueryStatus qs;
    187       struct GNUNET_TIME_Absolute now;
    188       struct GNUNET_PQ_QueryParam params[] = {
    189         GNUNET_PQ_query_param_string (job_name),
    190         GNUNET_PQ_query_param_absolute_time (&now),
    191         GNUNET_PQ_query_param_uint64 (start_row),
    192         GNUNET_PQ_query_param_uint64 (end_row),
    193         GNUNET_PQ_query_param_end
    194       };
    195 
    196       now = GNUNET_TIME_relative_to_absolute (delay);
    197       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    198                   "Trying to claim shard (%llu-%llu]\n",
    199                   (unsigned long long) *start_row,
    200                   (unsigned long long) *end_row);
    201 
    202       PREPARE (pg,
    203                "claim_next_shard",
    204                "INSERT INTO work_shards"
    205                "(job_name"
    206                ",last_attempt"
    207                ",start_row"
    208                ",end_row"
    209                ") VALUES "
    210                "($1, $2, $3, $4);");
    211       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
    212                                                "claim_next_shard",
    213                                                params);
    214       switch (qs)
    215       {
    216       case GNUNET_DB_STATUS_HARD_ERROR:
    217         GNUNET_break (0);
    218         TALER_EXCHANGEDB_rollback (pg);
    219         return qs;
    220       case GNUNET_DB_STATUS_SOFT_ERROR:
    221         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    222                     "Serialization error on claiming next shard\n");
    223         TALER_EXCHANGEDB_rollback (pg);
    224         continue;
    225       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    226         /* continued below */
    227         break;
    228       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    229         /* someone else got this shard already,
    230            try again */
    231         TALER_EXCHANGEDB_rollback (pg);
    232         continue;
    233       }
    234     } /* claim_next_shard */
    235 
    236     /* commit */
    237 commit:
    238     {
    239       enum GNUNET_DB_QueryStatus qs;
    240 
    241       qs = TALER_EXCHANGEDB_commit (pg);
    242       switch (qs)
    243       {
    244       case GNUNET_DB_STATUS_HARD_ERROR:
    245         GNUNET_break (0);
    246         TALER_EXCHANGEDB_rollback (pg);
    247         return qs;
    248       case GNUNET_DB_STATUS_SOFT_ERROR:
    249         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    250                     "Serialization error on commit for beginning shard\n");
    251         TALER_EXCHANGEDB_rollback (pg);
    252         continue;
    253       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    254       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    255         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    256                     "Claimed new shard\n");
    257         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    258       }
    259     }
    260   } /* retry 'for' loop */
    261   return GNUNET_DB_STATUS_SOFT_ERROR;
    262 }