pg_reserves_in_insert.c (11847B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022-2024 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file exchangedb/pg_reserves_in_insert.c 18 * @brief Implementation of the reserves_in_insert function for Postgres 19 * @author Christian Grothoff 20 * @author Joseph Xu 21 */ 22 #include "taler/taler_pq_lib.h" 23 #include "taler/exchange-database/reserves_in_insert.h" 24 #include "helper.h" 25 #include "taler/exchange-database/start.h" 26 #include "taler/exchange-database/start_read_committed.h" 27 #include "taler/exchange-database/commit.h" 28 #include "taler/exchange-database/preflight.h" 29 #include "taler/exchange-database/rollback.h" 30 31 32 /** 33 * Generate event notification for the reserve change. 34 * 35 * @param reserve_pub reserve to notfiy on 36 * @return string to pass to postgres for the notification 37 */ 38 static char * 39 compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub) 40 { 41 struct TALER_EXCHANGEDB_ReserveEventP rep = { 42 .header.size = htons (sizeof (rep)), 43 .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING), 44 .reserve_pub = *reserve_pub 45 }; 46 47 return GNUNET_PQ_get_event_notify_channel (&rep.header); 48 } 49 50 51 /** 52 * Closure for our helper_cb() 53 */ 54 struct Context 55 { 56 /** 57 * Array of reserve UUIDs to initialize. 58 */ 59 uint64_t *reserve_uuids; 60 61 /** 62 * Array with entries set to 'true' for duplicate transactions. 63 */ 64 bool *transaction_duplicates; 65 66 /** 67 * Array with entries set to 'true' for rows with conflicts. 68 */ 69 bool *conflicts; 70 71 /** 72 * Set to #GNUNET_SYSERR on failures. 73 */ 74 enum GNUNET_GenericReturnValue status; 75 76 /** 77 * Single value (no array) set to true if we need 78 * to follow-up with an update. 79 */ 80 bool needs_update; 81 }; 82 83 84 /** 85 * Helper function to be called with the results of a SELECT statement 86 * that has returned @a num_results results. 87 * 88 * @param cls closure of type `struct Context *` 89 * @param result the postgres result 90 * @param num_results the number of results in @a result 91 */ 92 static void 93 helper_cb (void *cls, 94 PGresult *result, 95 unsigned int num_results) 96 { 97 struct Context *ctx = cls; 98 99 for (unsigned int i = 0; i<num_results; i++) 100 { 101 struct GNUNET_PQ_ResultSpec rs[] = { 102 GNUNET_PQ_result_spec_bool ( 103 "transaction_duplicate", 104 &ctx->transaction_duplicates[i]), 105 GNUNET_PQ_result_spec_allow_null ( 106 GNUNET_PQ_result_spec_uint64 ("ruuid", 107 &ctx->reserve_uuids[i]), 108 &ctx->conflicts[i]), 109 GNUNET_PQ_result_spec_end 110 }; 111 112 if (GNUNET_OK != 113 GNUNET_PQ_extract_result (result, 114 rs, 115 i)) 116 { 117 GNUNET_break (0); 118 ctx->status = GNUNET_SYSERR; 119 return; 120 } 121 if (! ctx->transaction_duplicates[i]) 122 ctx->needs_update |= ctx->conflicts[i]; 123 } 124 } 125 126 127 enum GNUNET_DB_QueryStatus 128 TALER_EXCHANGEDB_reserves_in_insert ( 129 struct TALER_EXCHANGEDB_PostgresContext *pg, 130 const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, 131 unsigned int reserves_length, 132 enum GNUNET_DB_QueryStatus *results) 133 { 134 unsigned int dups = 0; 135 136 struct TALER_FullPaytoHashP h_full_paytos[ 137 GNUNET_NZL (reserves_length)]; 138 struct TALER_NormalizedPaytoHashP h_normalized_paytos[ 139 GNUNET_NZL (reserves_length)]; 140 char *notify_s[GNUNET_NZL (reserves_length)]; 141 struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)]; 142 struct TALER_Amount balances[GNUNET_NZL (reserves_length)]; 143 struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)]; 144 const char *sender_account_details[GNUNET_NZL (reserves_length)]; 145 const char *exchange_account_names[GNUNET_NZL (reserves_length)]; 146 uint64_t wire_references[GNUNET_NZL (reserves_length)]; 147 uint64_t reserve_uuids[GNUNET_NZL (reserves_length)]; 148 bool transaction_duplicates[GNUNET_NZL (reserves_length)]; 149 bool conflicts[GNUNET_NZL (reserves_length)]; 150 struct GNUNET_TIME_Timestamp reserve_expiration 151 = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); 152 struct GNUNET_TIME_Timestamp gc 153 = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); 154 enum GNUNET_DB_QueryStatus qs; 155 bool need_update; 156 157 for (unsigned int i = 0; i<reserves_length; i++) 158 { 159 const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i]; 160 161 TALER_full_payto_hash (reserve->sender_account_details, 162 &h_full_paytos[i]); 163 TALER_full_payto_normalize_and_hash (reserve->sender_account_details, 164 &h_normalized_paytos[i]); 165 notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); 166 reserve_pubs[i] = *reserve->reserve_pub; 167 balances[i] = *reserve->balance; 168 execution_times[i] = reserve->execution_time; 169 sender_account_details[i] = reserve->sender_account_details.full_payto; 170 exchange_account_names[i] = reserve->exchange_account_name; 171 wire_references[i] = reserve->wire_reference; 172 } 173 174 /* NOTE: kind-of pointless to explicitly start a transaction here... */ 175 if (GNUNET_OK != 176 TALER_EXCHANGEDB_preflight (pg)) 177 { 178 GNUNET_break (0); 179 qs = GNUNET_DB_STATUS_HARD_ERROR; 180 goto finished; 181 } 182 if (GNUNET_OK != 183 TALER_TALER_EXCHANGEDB_start_read_committed (pg, 184 "READ_COMMITED")) 185 { 186 GNUNET_break (0); 187 qs = GNUNET_DB_STATUS_HARD_ERROR; 188 goto finished; 189 } 190 PREPARE (pg, 191 "reserves_insert_with_array", 192 "SELECT" 193 " transaction_duplicate" 194 ",ruuid" 195 " FROM exchange_do_array_reserves_insert" 196 " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);"); 197 { 198 struct GNUNET_PQ_QueryParam params[] = { 199 GNUNET_PQ_query_param_timestamp (&gc), 200 GNUNET_PQ_query_param_timestamp (&reserve_expiration), 201 GNUNET_PQ_query_param_array_auto_from_type (reserves_length, 202 reserve_pubs, 203 pg->conn), 204 GNUNET_PQ_query_param_array_uint64 (reserves_length, 205 wire_references, 206 pg->conn), 207 TALER_PQ_query_param_array_amount ( 208 reserves_length, 209 balances, 210 pg->conn), 211 GNUNET_PQ_query_param_array_ptrs_string ( 212 reserves_length, 213 (const char **) exchange_account_names, 214 pg->conn), 215 GNUNET_PQ_query_param_array_timestamp ( 216 reserves_length, 217 execution_times, 218 pg->conn), 219 GNUNET_PQ_query_param_array_auto_from_type ( 220 reserves_length, 221 h_full_paytos, 222 pg->conn), 223 GNUNET_PQ_query_param_array_auto_from_type ( 224 reserves_length, 225 h_normalized_paytos, 226 pg->conn), 227 GNUNET_PQ_query_param_array_ptrs_string ( 228 reserves_length, 229 (const char **) sender_account_details, 230 pg->conn), 231 GNUNET_PQ_query_param_array_ptrs_string ( 232 reserves_length, 233 (const char **) notify_s, 234 pg->conn), 235 GNUNET_PQ_query_param_end 236 }; 237 struct Context ctx = { 238 .reserve_uuids = reserve_uuids, 239 .transaction_duplicates = transaction_duplicates, 240 .conflicts = conflicts, 241 .needs_update = false, 242 .status = GNUNET_OK 243 }; 244 245 qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, 246 "reserves_insert_with_array", 247 params, 248 &helper_cb, 249 &ctx); 250 GNUNET_PQ_cleanup_query_params_closures (params); 251 if ( (qs < 0) || 252 (GNUNET_OK != ctx.status) ) 253 { 254 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 255 "Failed to insert into reserves (%d)\n", 256 qs); 257 goto finished; 258 } 259 need_update = ctx.needs_update; 260 } 261 262 { 263 enum GNUNET_DB_QueryStatus cs; 264 265 cs = TALER_EXCHANGEDB_commit (pg); 266 if (cs < 0) 267 { 268 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 269 "Failed to commit\n"); 270 qs = cs; 271 goto finished; 272 } 273 } 274 275 for (unsigned int i = 0; i<reserves_length; i++) 276 { 277 if (transaction_duplicates[i]) 278 dups++; 279 results[i] = transaction_duplicates[i] 280 ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS 281 : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 282 } 283 284 if (! need_update) 285 { 286 qs = reserves_length; 287 goto finished; 288 } 289 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 290 "Reserve update needed for some reserves in the batch\n"); 291 PREPARE (pg, 292 "reserves_update", 293 "SELECT" 294 " out_duplicate AS duplicate " 295 "FROM exchange_do_batch_reserves_update" 296 " ($1,$2,$3,$4,$5,$6,$7);"); 297 298 if (GNUNET_OK != 299 TALER_EXCHANGEDB_start (pg, 300 "reserve-insert-continued")) 301 { 302 GNUNET_break (0); 303 qs = GNUNET_DB_STATUS_HARD_ERROR; 304 goto finished; 305 } 306 307 for (unsigned int i = 0; i<reserves_length; i++) 308 { 309 if (transaction_duplicates[i]) 310 continue; 311 if (! conflicts[i]) 312 continue; 313 { 314 bool duplicate; 315 struct GNUNET_PQ_QueryParam params[] = { 316 GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]), 317 GNUNET_PQ_query_param_timestamp (&reserve_expiration), 318 GNUNET_PQ_query_param_uint64 (&wire_references[i]), 319 TALER_PQ_query_param_amount (pg->conn, 320 &balances[i]), 321 GNUNET_PQ_query_param_string (exchange_account_names[i]), 322 GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]), 323 GNUNET_PQ_query_param_string (notify_s[i]), 324 GNUNET_PQ_query_param_end 325 }; 326 struct GNUNET_PQ_ResultSpec rs[] = { 327 GNUNET_PQ_result_spec_bool ("duplicate", 328 &duplicate), 329 GNUNET_PQ_result_spec_end 330 }; 331 enum GNUNET_DB_QueryStatus qsi; 332 333 qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 334 "reserves_update", 335 params, 336 rs); 337 if (qsi < 0) 338 { 339 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 340 "Failed to update reserves (%d)\n", 341 qsi); 342 results[i] = qsi; 343 goto finished; 344 } 345 results[i] = duplicate 346 ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS 347 : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 348 } 349 } 350 { 351 enum GNUNET_DB_QueryStatus cs; 352 353 cs = TALER_EXCHANGEDB_commit (pg); 354 if (cs < 0) 355 { 356 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 357 "Failed to commit\n"); 358 qs = cs; 359 goto finished; 360 } 361 } 362 finished: 363 for (unsigned int i = 0; i<reserves_length; i++) 364 GNUNET_free (notify_s[i]); 365 if (qs < 0) 366 return qs; 367 GNUNET_PQ_event_do_poll (pg->conn); 368 if (0 != dups) 369 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 370 "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n", 371 dups, 372 reserves_length); 373 return qs; 374 }