pg_begin_revolving_shard.c (9227B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file exchangedb/pg_begin_revolving_shard.c 18 * @brief Implementation of the begin_revolving_shard function for Postgres 19 * @author Christian Grothoff 20 */ 21 #include "taler/taler_pq_lib.h" 22 #include "taler/exchange-database/begin_revolving_shard.h" 23 #include "taler/exchange-database/commit.h" 24 #include "helper.h" 25 #include "taler/exchange-database/start.h" 26 #include "taler/exchange-database/rollback.h" 27 28 enum GNUNET_DB_QueryStatus 29 TALER_EXCHANGEDB_begin_revolving_shard (struct 30 TALER_EXCHANGEDB_PostgresContext *pg, 31 const char *job_name, 32 uint32_t shard_size, 33 uint32_t shard_limit, 34 uint32_t *start_row, 35 uint32_t *end_row) 36 { 37 38 GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX); 39 GNUNET_assert (shard_limit > 0); 40 GNUNET_assert (shard_size > 0); 41 for (unsigned int retries = 0; retries<3; retries++) 42 { 43 if (GNUNET_OK != 44 TALER_EXCHANGEDB_start (pg, 45 "begin_revolving_shard")) 46 { 47 GNUNET_break (0); 48 return GNUNET_DB_STATUS_HARD_ERROR; 49 } 50 51 /* First, find last 'end_row' */ 52 { 53 enum GNUNET_DB_QueryStatus qs; 54 uint32_t last_end; 55 struct GNUNET_PQ_QueryParam params[] = { 56 GNUNET_PQ_query_param_string (job_name), 57 GNUNET_PQ_query_param_end 58 }; 59 struct GNUNET_PQ_ResultSpec rs[] = { 60 GNUNET_PQ_result_spec_uint32 ("end_row", 61 &last_end), 62 GNUNET_PQ_result_spec_end 63 }; 64 /* Used in #postgres_begin_revolving_shard() */ 65 PREPARE (pg, 66 "get_last_revolving_shard", 67 "SELECT" 68 " end_row" 69 " FROM revolving_work_shards" 70 " WHERE job_name=$1" 71 " ORDER BY end_row DESC" 72 " LIMIT 1;"); 73 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 74 "get_last_revolving_shard", 75 params, 76 rs); 77 switch (qs) 78 { 79 case GNUNET_DB_STATUS_HARD_ERROR: 80 GNUNET_break (0); 81 TALER_EXCHANGEDB_rollback (pg); 82 return qs; 83 case GNUNET_DB_STATUS_SOFT_ERROR: 84 TALER_EXCHANGEDB_rollback (pg); 85 continue; 86 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 87 *start_row = 1U + last_end; 88 break; 89 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 90 *start_row = 0; /* base-case: no shards yet */ 91 break; /* continued below */ 92 } 93 } /* get_last_shard */ 94 95 if (*start_row < shard_limit) 96 { 97 /* Claim fresh shard */ 98 enum GNUNET_DB_QueryStatus qs; 99 struct GNUNET_TIME_Absolute now; 100 struct GNUNET_PQ_QueryParam params[] = { 101 GNUNET_PQ_query_param_string (job_name), 102 GNUNET_PQ_query_param_absolute_time (&now), 103 GNUNET_PQ_query_param_uint32 (start_row), 104 GNUNET_PQ_query_param_uint32 (end_row), 105 GNUNET_PQ_query_param_end 106 }; 107 108 *end_row = GNUNET_MIN (shard_limit, 109 *start_row + shard_size - 1); 110 now = GNUNET_TIME_absolute_get (); 111 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 112 "Trying to claim shard %llu-%llu\n", 113 (unsigned long long) *start_row, 114 (unsigned long long) *end_row); 115 116 /* Used in #postgres_claim_revolving_shard() */ 117 PREPARE (pg, 118 "create_revolving_shard", 119 "INSERT INTO revolving_work_shards" 120 "(job_name" 121 ",last_attempt" 122 ",start_row" 123 ",end_row" 124 ",active" 125 ") VALUES " 126 "($1, $2, $3, $4, TRUE);"); 127 qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, 128 "create_revolving_shard", 129 params); 130 switch (qs) 131 { 132 case GNUNET_DB_STATUS_HARD_ERROR: 133 GNUNET_break (0); 134 TALER_EXCHANGEDB_rollback (pg); 135 return qs; 136 case GNUNET_DB_STATUS_SOFT_ERROR: 137 TALER_EXCHANGEDB_rollback (pg); 138 continue; 139 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 140 /* continued below (with commit) */ 141 break; 142 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 143 /* someone else got this shard already, 144 try again */ 145 TALER_EXCHANGEDB_rollback (pg); 146 continue; 147 } 148 } /* end create fresh reovlving shard */ 149 else 150 { 151 /* claim oldest existing shard */ 152 enum GNUNET_DB_QueryStatus qs; 153 struct GNUNET_PQ_QueryParam params[] = { 154 GNUNET_PQ_query_param_string (job_name), 155 GNUNET_PQ_query_param_end 156 }; 157 struct GNUNET_PQ_ResultSpec rs[] = { 158 GNUNET_PQ_result_spec_uint32 ("start_row", 159 start_row), 160 GNUNET_PQ_result_spec_uint32 ("end_row", 161 end_row), 162 GNUNET_PQ_result_spec_end 163 }; 164 165 PREPARE (pg, 166 "get_open_revolving_shard", 167 "SELECT" 168 " start_row" 169 ",end_row" 170 " FROM revolving_work_shards" 171 " WHERE job_name=$1" 172 " AND active=FALSE" 173 " ORDER BY last_attempt ASC" 174 " LIMIT 1;"); 175 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 176 "get_open_revolving_shard", 177 params, 178 rs); 179 switch (qs) 180 { 181 case GNUNET_DB_STATUS_HARD_ERROR: 182 GNUNET_break (0); 183 TALER_EXCHANGEDB_rollback (pg); 184 return qs; 185 case GNUNET_DB_STATUS_SOFT_ERROR: 186 TALER_EXCHANGEDB_rollback (pg); 187 continue; 188 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 189 /* no open shards available */ 190 TALER_EXCHANGEDB_rollback (pg); 191 return qs; 192 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 193 { 194 enum GNUNET_DB_QueryStatus qsz; 195 struct GNUNET_TIME_Timestamp now; 196 struct GNUNET_PQ_QueryParam iparams[] = { 197 GNUNET_PQ_query_param_string (job_name), 198 GNUNET_PQ_query_param_timestamp (&now), 199 GNUNET_PQ_query_param_uint32 (start_row), 200 GNUNET_PQ_query_param_uint32 (end_row), 201 GNUNET_PQ_query_param_end 202 }; 203 204 now = GNUNET_TIME_timestamp_get (); 205 PREPARE (pg, 206 "reclaim_revolving_shard", 207 "UPDATE revolving_work_shards" 208 " SET last_attempt=$2" 209 " ,active=TRUE" 210 " WHERE job_name=$1" 211 " AND start_row=$3" 212 " AND end_row=$4"); 213 qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn, 214 "reclaim_revolving_shard", 215 iparams); 216 switch (qsz) 217 { 218 case GNUNET_DB_STATUS_HARD_ERROR: 219 GNUNET_break (0); 220 TALER_EXCHANGEDB_rollback (pg); 221 return qs; 222 case GNUNET_DB_STATUS_SOFT_ERROR: 223 TALER_EXCHANGEDB_rollback (pg); 224 continue; 225 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 226 break; /* continue with commit */ 227 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 228 GNUNET_break (0); /* logic error, should be impossible */ 229 TALER_EXCHANGEDB_rollback (pg); 230 return GNUNET_DB_STATUS_HARD_ERROR; 231 } 232 } 233 break; /* continue with commit */ 234 } 235 } /* end claim oldest existing shard */ 236 237 /* commit */ 238 { 239 enum GNUNET_DB_QueryStatus qs; 240 241 qs = TALER_EXCHANGEDB_commit (pg); 242 switch (qs) 243 { 244 case GNUNET_DB_STATUS_HARD_ERROR: 245 GNUNET_break (0); 246 TALER_EXCHANGEDB_rollback (pg); 247 return qs; 248 case GNUNET_DB_STATUS_SOFT_ERROR: 249 TALER_EXCHANGEDB_rollback (pg); 250 continue; 251 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 252 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 253 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 254 } 255 } 256 } /* retry 'for' loop */ 257 return GNUNET_DB_STATUS_SOFT_ERROR; 258 }