exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

begin_revolving_shard.c (8973B)


      1 /*
      2    This file is part of TALER
      3    Copyright (C) 2022 Taler Systems SA
      4 
      5    TALER is free software; you can redistribute it and/or modify it under the
      6    terms of the GNU General Public License as published by the Free Software
      7    Foundation; either version 3, or (at your option) any later version.
      8 
      9    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13    You should have received a copy of the GNU General Public License along with
     14    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15  */
     16 /**
     17  * @file exchangedb/begin_revolving_shard.c
     18  * @brief Implementation of the begin_revolving_shard function for Postgres
     19  * @author Christian Grothoff
     20  */
     21 #include "taler/taler_pq_lib.h"
     22 #include "exchange-database/begin_revolving_shard.h"
     23 #include "exchange-database/commit.h"
     24 #include "helper.h"
     25 #include "exchange-database/start.h"
     26 #include "exchange-database/rollback.h"
     27 
     28 enum GNUNET_DB_QueryStatus
     29 TALER_EXCHANGEDB_begin_revolving_shard (
     30   struct TALER_EXCHANGEDB_PostgresContext *pg,
     31   const char *job_name,
     32   uint32_t shard_size,
     33   uint32_t shard_limit,
     34   uint32_t *start_row,
     35   uint32_t *end_row)
     36 {
     37 
     38   GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX);
     39   GNUNET_assert (shard_limit > 0);
     40   GNUNET_assert (shard_size > 0);
     41   for (unsigned int retries = 0; retries<3; retries++)
     42   {
     43     if (GNUNET_OK !=
     44         TALER_EXCHANGEDB_start (pg,
     45                                 "begin_revolving_shard"))
     46     {
     47       GNUNET_break (0);
     48       return GNUNET_DB_STATUS_HARD_ERROR;
     49     }
     50 
     51     /* First, find last 'end_row' */
     52     {
     53       enum GNUNET_DB_QueryStatus qs;
     54       uint32_t last_end;
     55       struct GNUNET_PQ_QueryParam params[] = {
     56         GNUNET_PQ_query_param_string (job_name),
     57         GNUNET_PQ_query_param_end
     58       };
     59       struct GNUNET_PQ_ResultSpec rs[] = {
     60         GNUNET_PQ_result_spec_uint32 ("end_row",
     61                                       &last_end),
     62         GNUNET_PQ_result_spec_end
     63       };
     64       /* Used in #postgres_begin_revolving_shard() */
     65       PREPARE (pg,
     66                "get_last_revolving_shard",
     67                "SELECT"
     68                " end_row"
     69                " FROM revolving_work_shards"
     70                " WHERE job_name=$1"
     71                " ORDER BY end_row DESC"
     72                " LIMIT 1;");
     73       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     74                                                      "get_last_revolving_shard",
     75                                                      params,
     76                                                      rs);
     77       switch (qs)
     78       {
     79       case GNUNET_DB_STATUS_HARD_ERROR:
     80         GNUNET_break (0);
     81         TALER_EXCHANGEDB_rollback (pg);
     82         return qs;
     83       case GNUNET_DB_STATUS_SOFT_ERROR:
     84         TALER_EXCHANGEDB_rollback (pg);
     85         continue;
     86       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     87         *start_row = 1U + last_end;
     88         break;
     89       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     90         *start_row = 0; /* base-case: no shards yet */
     91         break; /* continued below */
     92       }
     93     } /* get_last_shard */
     94 
     95     if (*start_row < shard_limit)
     96     {
     97       /* Claim fresh shard */
     98       enum GNUNET_DB_QueryStatus qs;
     99       struct GNUNET_TIME_Absolute now;
    100       struct GNUNET_PQ_QueryParam params[] = {
    101         GNUNET_PQ_query_param_string (job_name),
    102         GNUNET_PQ_query_param_absolute_time (&now),
    103         GNUNET_PQ_query_param_uint32 (start_row),
    104         GNUNET_PQ_query_param_uint32 (end_row),
    105         GNUNET_PQ_query_param_end
    106       };
    107 
    108       *end_row = GNUNET_MIN (shard_limit,
    109                              *start_row + shard_size - 1);
    110       now = GNUNET_TIME_absolute_get ();
    111       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    112                   "Trying to claim shard %llu-%llu\n",
    113                   (unsigned long long) *start_row,
    114                   (unsigned long long) *end_row);
    115 
    116       /* Used in #postgres_claim_revolving_shard() */
    117       PREPARE (pg,
    118                "create_revolving_shard",
    119                "INSERT INTO revolving_work_shards"
    120                "(job_name"
    121                ",last_attempt"
    122                ",start_row"
    123                ",end_row"
    124                ",active"
    125                ") VALUES "
    126                "($1, $2, $3, $4, TRUE);");
    127       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
    128                                                "create_revolving_shard",
    129                                                params);
    130       switch (qs)
    131       {
    132       case GNUNET_DB_STATUS_HARD_ERROR:
    133         GNUNET_break (0);
    134         TALER_EXCHANGEDB_rollback (pg);
    135         return qs;
    136       case GNUNET_DB_STATUS_SOFT_ERROR:
    137         TALER_EXCHANGEDB_rollback (pg);
    138         continue;
    139       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    140         /* continued below (with commit) */
    141         break;
    142       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    143         /* someone else got this shard already,
    144            try again */
    145         TALER_EXCHANGEDB_rollback (pg);
    146         continue;
    147       }
    148     } /* end create fresh reovlving shard */
    149     else
    150     {
    151       /* claim oldest existing shard */
    152       enum GNUNET_DB_QueryStatus qs;
    153       struct GNUNET_PQ_QueryParam params[] = {
    154         GNUNET_PQ_query_param_string (job_name),
    155         GNUNET_PQ_query_param_end
    156       };
    157       struct GNUNET_PQ_ResultSpec rs[] = {
    158         GNUNET_PQ_result_spec_uint32 ("start_row",
    159                                       start_row),
    160         GNUNET_PQ_result_spec_uint32 ("end_row",
    161                                       end_row),
    162         GNUNET_PQ_result_spec_end
    163       };
    164 
    165       PREPARE (pg,
    166                "get_open_revolving_shard",
    167                "SELECT"
    168                " start_row"
    169                ",end_row"
    170                " FROM revolving_work_shards"
    171                " WHERE job_name=$1"
    172                "   AND active=FALSE"
    173                " ORDER BY last_attempt ASC"
    174                " LIMIT 1;");
    175       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
    176                                                      "get_open_revolving_shard",
    177                                                      params,
    178                                                      rs);
    179       switch (qs)
    180       {
    181       case GNUNET_DB_STATUS_HARD_ERROR:
    182         GNUNET_break (0);
    183         TALER_EXCHANGEDB_rollback (pg);
    184         return qs;
    185       case GNUNET_DB_STATUS_SOFT_ERROR:
    186         TALER_EXCHANGEDB_rollback (pg);
    187         continue;
    188       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    189         /* no open shards available */
    190         TALER_EXCHANGEDB_rollback (pg);
    191         return qs;
    192       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    193         {
    194           enum GNUNET_DB_QueryStatus qsz;
    195           struct GNUNET_TIME_Timestamp now;
    196           struct GNUNET_PQ_QueryParam iparams[] = {
    197             GNUNET_PQ_query_param_string (job_name),
    198             GNUNET_PQ_query_param_timestamp (&now),
    199             GNUNET_PQ_query_param_uint32 (start_row),
    200             GNUNET_PQ_query_param_uint32 (end_row),
    201             GNUNET_PQ_query_param_end
    202           };
    203 
    204           now = GNUNET_TIME_timestamp_get ();
    205           PREPARE (pg,
    206                    "reclaim_revolving_shard",
    207                    "UPDATE revolving_work_shards"
    208                    " SET last_attempt=$2"
    209                    "    ,active=TRUE"
    210                    " WHERE job_name=$1"
    211                    "   AND start_row=$3"
    212                    "   AND end_row=$4");
    213           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
    214                                                     "reclaim_revolving_shard",
    215                                                     iparams);
    216           switch (qsz)
    217           {
    218           case GNUNET_DB_STATUS_HARD_ERROR:
    219             GNUNET_break (0);
    220             TALER_EXCHANGEDB_rollback (pg);
    221             return qs;
    222           case GNUNET_DB_STATUS_SOFT_ERROR:
    223             TALER_EXCHANGEDB_rollback (pg);
    224             continue;
    225           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    226             break; /* continue with commit */
    227           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    228             GNUNET_break (0); /* logic error, should be impossible */
    229             TALER_EXCHANGEDB_rollback (pg);
    230             return GNUNET_DB_STATUS_HARD_ERROR;
    231           }
    232         }
    233         break; /* continue with commit */
    234       }
    235     } /* end claim oldest existing shard */
    236 
    237     /* commit */
    238     {
    239       enum GNUNET_DB_QueryStatus qs;
    240 
    241       qs = TALER_EXCHANGEDB_commit (pg);
    242       switch (qs)
    243       {
    244       case GNUNET_DB_STATUS_HARD_ERROR:
    245         GNUNET_break (0);
    246         TALER_EXCHANGEDB_rollback (pg);
    247         return qs;
    248       case GNUNET_DB_STATUS_SOFT_ERROR:
    249         TALER_EXCHANGEDB_rollback (pg);
    250         continue;
    251       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    252       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    253         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    254       }
    255     }
    256   } /* retry 'for' loop */
    257   return GNUNET_DB_STATUS_SOFT_ERROR;
    258 }