begin_shard.c (9218B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file exchangedb/begin_shard.c 18 * @brief Implementation of the begin_shard function for Postgres 19 * @author Christian Grothoff 20 */ 21 #include "taler/taler_pq_lib.h" 22 #include "exchange-database/begin_shard.h" 23 #include "helper.h" 24 #include "exchange-database/start.h" 25 #include "exchange-database/rollback.h" 26 #include "exchange-database/commit.h" 27 28 29 enum GNUNET_DB_QueryStatus 30 TALER_EXCHANGEDB_begin_shard (struct TALER_EXCHANGEDB_PostgresContext *pg, 31 const char *job_name, 32 struct GNUNET_TIME_Relative delay, 33 uint64_t shard_size, 34 uint64_t *start_row, 35 uint64_t *end_row) 36 { 37 38 for (unsigned int retries = 0; retries<10; retries++) 39 { 40 if (GNUNET_OK != 41 TALER_EXCHANGEDB_start (pg, 42 "begin_shard")) 43 { 44 GNUNET_break (0); 45 return GNUNET_DB_STATUS_HARD_ERROR; 46 } 47 48 { 49 struct GNUNET_TIME_Absolute past; 50 enum GNUNET_DB_QueryStatus qs; 51 struct GNUNET_PQ_QueryParam params[] = { 52 GNUNET_PQ_query_param_string (job_name), 53 GNUNET_PQ_query_param_absolute_time (&past), 54 GNUNET_PQ_query_param_end 55 }; 56 struct GNUNET_PQ_ResultSpec rs[] = { 57 GNUNET_PQ_result_spec_uint64 ("start_row", 58 start_row), 59 GNUNET_PQ_result_spec_uint64 ("end_row", 60 end_row), 61 GNUNET_PQ_result_spec_end 62 }; 63 64 past = GNUNET_TIME_absolute_get (); 65 PREPARE (pg, 66 "get_open_shard", 67 "SELECT" 68 " start_row" 69 ",end_row" 70 " FROM work_shards" 71 " WHERE job_name=$1" 72 " AND completed=FALSE" 73 " AND last_attempt<$2" 74 " ORDER BY last_attempt ASC" 75 " LIMIT 1;"); 76 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 77 "get_open_shard", 78 params, 79 rs); 80 switch (qs) 81 { 82 case GNUNET_DB_STATUS_HARD_ERROR: 83 GNUNET_break (0); 84 TALER_EXCHANGEDB_rollback (pg); 85 return qs; 86 case GNUNET_DB_STATUS_SOFT_ERROR: 87 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 88 "Serialization error on getting open shard\n"); 89 TALER_EXCHANGEDB_rollback (pg); 90 continue; 91 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 92 { 93 enum GNUNET_DB_QueryStatus qsz; 94 struct GNUNET_TIME_Absolute now; 95 struct GNUNET_PQ_QueryParam iparams[] = { 96 GNUNET_PQ_query_param_string (job_name), 97 GNUNET_PQ_query_param_absolute_time (&now), 98 GNUNET_PQ_query_param_uint64 (start_row), 99 GNUNET_PQ_query_param_uint64 (end_row), 100 GNUNET_PQ_query_param_end 101 }; 102 103 now = GNUNET_TIME_relative_to_absolute (delay); 104 PREPARE (pg, 105 "reclaim_shard", 106 "UPDATE work_shards" 107 " SET last_attempt=$2" 108 " WHERE job_name=$1" 109 " AND start_row=$3" 110 " AND end_row=$4"); 111 qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn, 112 "reclaim_shard", 113 iparams); 114 switch (qsz) 115 { 116 case GNUNET_DB_STATUS_HARD_ERROR: 117 GNUNET_break (0); 118 TALER_EXCHANGEDB_rollback (pg); 119 return qsz; 120 case GNUNET_DB_STATUS_SOFT_ERROR: 121 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 122 "Serialization error on claiming open shard\n"); 123 TALER_EXCHANGEDB_rollback (pg); 124 continue; 125 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 126 goto commit; 127 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 128 GNUNET_break (0); /* logic error, should be impossible */ 129 TALER_EXCHANGEDB_rollback (pg); 130 return GNUNET_DB_STATUS_HARD_ERROR; 131 } 132 } 133 break; /* actually unreachable */ 134 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 135 break; /* continued below */ 136 } 137 } /* get_open_shard */ 138 139 /* No open shard, find last 'end_row' */ 140 { 141 enum GNUNET_DB_QueryStatus qs; 142 struct GNUNET_PQ_QueryParam params[] = { 143 GNUNET_PQ_query_param_string (job_name), 144 GNUNET_PQ_query_param_end 145 }; 146 struct GNUNET_PQ_ResultSpec rs[] = { 147 GNUNET_PQ_result_spec_uint64 ("end_row", 148 start_row), 149 GNUNET_PQ_result_spec_end 150 }; 151 152 PREPARE (pg, 153 "get_last_shard", 154 "SELECT" 155 " end_row" 156 " FROM work_shards" 157 " WHERE job_name=$1" 158 " ORDER BY end_row DESC" 159 " LIMIT 1;"); 160 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 161 "get_last_shard", 162 params, 163 rs); 164 switch (qs) 165 { 166 case GNUNET_DB_STATUS_HARD_ERROR: 167 GNUNET_break (0); 168 TALER_EXCHANGEDB_rollback (pg); 169 return qs; 170 case GNUNET_DB_STATUS_SOFT_ERROR: 171 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 172 "Serialization error on getting last shard\n"); 173 TALER_EXCHANGEDB_rollback (pg); 174 continue; 175 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 176 break; 177 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 178 *start_row = 0; /* base-case: no shards yet */ 179 break; /* continued below */ 180 } 181 *end_row = *start_row + shard_size; 182 } /* get_last_shard */ 183 184 /* Claim fresh shard */ 185 { 186 enum GNUNET_DB_QueryStatus qs; 187 struct GNUNET_TIME_Absolute now; 188 struct GNUNET_PQ_QueryParam params[] = { 189 GNUNET_PQ_query_param_string (job_name), 190 GNUNET_PQ_query_param_absolute_time (&now), 191 GNUNET_PQ_query_param_uint64 (start_row), 192 GNUNET_PQ_query_param_uint64 (end_row), 193 GNUNET_PQ_query_param_end 194 }; 195 196 now = GNUNET_TIME_relative_to_absolute (delay); 197 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 198 "Trying to claim shard (%llu-%llu]\n", 199 (unsigned long long) *start_row, 200 (unsigned long long) *end_row); 201 202 PREPARE (pg, 203 "claim_next_shard", 204 "INSERT INTO work_shards" 205 "(job_name" 206 ",last_attempt" 207 ",start_row" 208 ",end_row" 209 ") VALUES " 210 "($1, $2, $3, $4);"); 211 qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, 212 "claim_next_shard", 213 params); 214 switch (qs) 215 { 216 case GNUNET_DB_STATUS_HARD_ERROR: 217 GNUNET_break (0); 218 TALER_EXCHANGEDB_rollback (pg); 219 return qs; 220 case GNUNET_DB_STATUS_SOFT_ERROR: 221 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 222 "Serialization error on claiming next shard\n"); 223 TALER_EXCHANGEDB_rollback (pg); 224 continue; 225 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 226 /* continued below */ 227 break; 228 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 229 /* someone else got this shard already, 230 try again */ 231 TALER_EXCHANGEDB_rollback (pg); 232 continue; 233 } 234 } /* claim_next_shard */ 235 236 /* commit */ 237 commit: 238 { 239 enum GNUNET_DB_QueryStatus qs; 240 241 qs = TALER_EXCHANGEDB_commit (pg); 242 switch (qs) 243 { 244 case GNUNET_DB_STATUS_HARD_ERROR: 245 GNUNET_break (0); 246 TALER_EXCHANGEDB_rollback (pg); 247 return qs; 248 case GNUNET_DB_STATUS_SOFT_ERROR: 249 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 250 "Serialization error on commit for beginning shard\n"); 251 TALER_EXCHANGEDB_rollback (pg); 252 continue; 253 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 254 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 255 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 256 "Claimed new shard\n"); 257 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 258 } 259 } 260 } /* retry 'for' loop */ 261 return GNUNET_DB_STATUS_SOFT_ERROR; 262 }