taler-helper-auditor-transfer.c (16869B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2017-2024 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file auditor/taler-helper-auditor-transfer.c 18 * @brief audits that deposits past due date are 19 * aggregated and have a matching wire transfer 20 * database. 21 * @author Christian Grothoff 22 */ 23 #include "platform.h" 24 #include <gnunet/gnunet_util_lib.h> 25 #include <gnunet/gnunet_curl_lib.h> 26 #include "auditordb_lib.h" 27 #include "exchangedb_lib.h" 28 #include "taler/taler_json_lib.h" 29 #include "report-lib.h" 30 #include "taler/taler_dbevents.h" 31 #include "auditor-database/delete_early_aggregation.h" 32 #include "auditor-database/delete_pending_deposit.h" 33 #include "auditor-database/event_listen.h" 34 #include "auditor-database/get_auditor_progress.h" 35 #include "auditor-database/get_balance.h" 36 #include "auditor-database/insert_auditor_progress.h" 37 #include "auditor-database/insert_balance.h" 38 #include "auditor-database/insert_early_aggregation.h" 39 #include "auditor-database/insert_pending_deposit.h" 40 #include "auditor-database/preflight.h" 41 #include "auditor-database/start.h" 42 #include "auditor-database/update_auditor_progress.h" 43 #include "auditor-database/update_balance.h" 44 #include "exchange-database/preflight.h" 45 #include "exchange-database/rollback.h" 46 #include "exchange-database/select_aggregations_above_serial.h" 47 #include "exchange-database/select_batch_deposits_missing_wire.h" 48 #include "exchange-database/start_read_only.h" 49 50 51 /** 52 * Run in test mode. Exit when idle instead of 53 * going to sleep and waiting for more work. 54 */ 55 static int test_mode; 56 57 /** 58 * Return value from main(). 59 */ 60 static int global_ret; 61 62 /** 63 * Last reserve_out / wire_out serial IDs seen. 64 */ 65 static TALER_ARL_DEF_PP (wire_batch_deposit_id); 66 static TALER_ARL_DEF_PP (wire_aggregation_id); 67 68 /** 69 * Total amount which the exchange did not aggregate/transfer in time. 70 */ 71 static TALER_ARL_DEF_AB (total_amount_lag); 72 73 /** 74 * Total amount which the exchange did aggregate/transfer too early. 75 */ 76 static TALER_ARL_DEF_AB (total_early_aggregation); 77 78 /** 79 * Should we run checks that only work for exchange-internal audits? 80 */ 81 static int internal_checks; 82 83 /** 84 * Database event handler to wake us up again. 85 */ 86 static struct GNUNET_DB_EventHandler *eh; 87 88 /** 89 * The auditors's configuration. 90 */ 91 static const struct GNUNET_CONFIGURATION_Handle *cfg; 92 93 94 /** 95 * Task run on shutdown. 96 * 97 * @param cls NULL 98 */ 99 static void 100 do_shutdown (void *cls) 101 { 102 (void) cls; 103 if (NULL != eh) 104 { 105 TALER_AUDITORDB_event_listen_cancel (eh); 106 eh = NULL; 107 } 108 TALER_ARL_done (); 109 TALER_EXCHANGEDB_unload_accounts (); 110 TALER_ARL_cfg = NULL; 111 } 112 113 114 /** 115 * Closure for import_wire_missing_cb(). 116 */ 117 struct ImportMissingWireContext 118 { 119 /** 120 * Set to maximum row ID encountered. 121 */ 122 uint64_t max_batch_deposit_uuid; 123 124 /** 125 * Set to database errors in callback. 126 */ 127 enum GNUNET_DB_QueryStatus err; 128 }; 129 130 131 /** 132 * Function called on deposits that need to be checked for their 133 * wire transfer. 134 * 135 * @param cls closure, points to a `struct ImportMissingWireContext` 136 * @param batch_deposit_serial_id serial of the entry in the batch deposits table 137 * @param total_amount value of the missing deposits, including fee 138 * @param wire_target_h_payto where should the funds be wired 139 * @param deadline what was the earliest requested wire transfer deadline 140 */ 141 static void 142 import_wire_missing_cb ( 143 void *cls, 144 uint64_t batch_deposit_serial_id, 145 const struct TALER_Amount *total_amount, 146 const struct TALER_FullPaytoHashP *wire_target_h_payto, 147 struct GNUNET_TIME_Timestamp deadline) 148 { 149 struct ImportMissingWireContext *wc = cls; 150 enum GNUNET_DB_QueryStatus qs; 151 152 if (wc->err < 0) 153 return; /* already failed */ 154 GNUNET_assert (batch_deposit_serial_id >= wc->max_batch_deposit_uuid); 155 wc->max_batch_deposit_uuid = batch_deposit_serial_id + 1; 156 qs = TALER_AUDITORDB_delete_early_aggregation ( 157 TALER_ARL_adb, 158 batch_deposit_serial_id); 159 switch (qs) 160 { 161 case GNUNET_DB_STATUS_SOFT_ERROR: 162 GNUNET_break (0); 163 wc->err = qs; 164 return; 165 case GNUNET_DB_STATUS_HARD_ERROR: 166 GNUNET_break (0); 167 wc->err = qs; 168 return; 169 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 170 qs = TALER_AUDITORDB_insert_pending_deposit ( 171 TALER_ARL_adb, 172 batch_deposit_serial_id, 173 wire_target_h_payto, 174 total_amount, 175 deadline); 176 if (0 > qs) 177 { 178 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 179 wc->err = qs; 180 } 181 TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_amount_lag), 182 &TALER_ARL_USE_AB (total_amount_lag), 183 total_amount); 184 break; 185 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 186 TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_early_aggregation), 187 &TALER_ARL_USE_AB (total_early_aggregation), 188 total_amount); 189 break; 190 default: 191 GNUNET_assert (0); 192 } 193 } 194 195 196 /** 197 * Checks for wire transfers that should have happened. 198 * 199 * @return transaction status 200 */ 201 static enum GNUNET_DB_QueryStatus 202 check_for_required_transfers (void) 203 { 204 enum GNUNET_DB_QueryStatus qs; 205 struct ImportMissingWireContext wc = { 206 .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id), 207 .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT 208 }; 209 210 qs = TALER_EXCHANGEDB_select_batch_deposits_missing_wire ( 211 TALER_ARL_edb, 212 TALER_ARL_USE_PP (wire_batch_deposit_id), 213 &import_wire_missing_cb, 214 &wc); 215 if (0 > qs) 216 { 217 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 218 return qs; 219 } 220 if (0 > wc.err) 221 { 222 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wc.err); 223 return wc.err; 224 } 225 TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid; 226 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 227 } 228 229 230 /** 231 * Closure for #clear_finished_transfer_cb(). 232 */ 233 struct AggregationContext 234 { 235 /** 236 * Set to maximum row ID encountered. 237 */ 238 uint64_t max_aggregation_serial; 239 240 /** 241 * Set to database errors in callback. 242 */ 243 enum GNUNET_DB_QueryStatus err; 244 }; 245 246 247 /** 248 * Function called on aggregations that were done for 249 * a (batch) deposit. 250 * 251 * @param cls closure 252 * @param amount affected amount 253 * @param tracking_serial_id where in the table are we 254 * @param batch_deposit_serial_id which batch deposit was aggregated 255 */ 256 static void 257 clear_finished_transfer_cb ( 258 void *cls, 259 const struct TALER_Amount *amount, 260 uint64_t tracking_serial_id, 261 uint64_t batch_deposit_serial_id) 262 { 263 struct AggregationContext *ac = cls; 264 enum GNUNET_DB_QueryStatus qs; 265 266 if (0 > ac->err) 267 return; /* already failed */ 268 GNUNET_assert (ac->max_aggregation_serial <= tracking_serial_id); 269 ac->max_aggregation_serial = tracking_serial_id + 1; 270 qs = TALER_AUDITORDB_delete_pending_deposit ( 271 TALER_ARL_adb, 272 batch_deposit_serial_id); 273 switch (qs) 274 { 275 case GNUNET_DB_STATUS_SOFT_ERROR: 276 GNUNET_break (0); 277 ac->err = qs; 278 return; 279 case GNUNET_DB_STATUS_HARD_ERROR: 280 GNUNET_break (0); 281 ac->err = qs; 282 return; 283 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 284 qs = TALER_AUDITORDB_insert_early_aggregation ( 285 TALER_ARL_adb, 286 batch_deposit_serial_id, 287 tracking_serial_id, 288 amount); 289 if (0 > qs) 290 { 291 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 292 ac->err = qs; 293 return; 294 } 295 TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_early_aggregation), 296 &TALER_ARL_USE_AB (total_early_aggregation), 297 amount); 298 break; 299 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 300 TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_amount_lag), 301 &TALER_ARL_USE_AB (total_amount_lag), 302 amount); 303 break; 304 default: 305 GNUNET_assert (0); 306 } 307 } 308 309 310 /** 311 * Checks that all wire transfers that should have happened 312 * (based on deposits) have indeed happened. 313 * 314 * @return transaction status 315 */ 316 static enum GNUNET_DB_QueryStatus 317 check_for_completed_transfers (void) 318 { 319 struct AggregationContext ac = { 320 .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id), 321 .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT 322 }; 323 enum GNUNET_DB_QueryStatus qs; 324 325 qs = TALER_EXCHANGEDB_select_aggregations_above_serial ( 326 TALER_ARL_edb, 327 TALER_ARL_USE_PP (wire_aggregation_id), 328 &clear_finished_transfer_cb, 329 &ac); 330 if (0 > qs) 331 { 332 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 333 return qs; 334 } 335 if (0 > ac.err) 336 { 337 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == ac.err); 338 return ac.err; 339 } 340 TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial; 341 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 342 } 343 344 345 /** 346 * Start the database transactions and begin the audit. 347 * 348 * @return transaction status 349 */ 350 static enum GNUNET_DB_QueryStatus 351 begin_transaction (void) 352 { 353 enum GNUNET_DB_QueryStatus qs; 354 355 if (GNUNET_SYSERR == 356 TALER_EXCHANGEDB_preflight (TALER_ARL_edb)) 357 { 358 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 359 "Failed to initialize exchange database connection.\n"); 360 return GNUNET_DB_STATUS_HARD_ERROR; 361 } 362 if (GNUNET_SYSERR == 363 TALER_AUDITORDB_preflight (TALER_ARL_adb)) 364 { 365 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 366 "Failed to initialize auditor database session.\n"); 367 return GNUNET_DB_STATUS_HARD_ERROR; 368 } 369 if (GNUNET_OK != 370 TALER_AUDITORDB_start (TALER_ARL_adb)) 371 { 372 GNUNET_break (0); 373 return GNUNET_DB_STATUS_HARD_ERROR; 374 } 375 if (GNUNET_OK != 376 TALER_TALER_EXCHANGEDB_start_read_only (TALER_ARL_edb, 377 "transfer auditor")) 378 { 379 GNUNET_break (0); 380 TALER_AUDITORDB_rollback (TALER_ARL_adb); 381 return GNUNET_DB_STATUS_HARD_ERROR; 382 } 383 qs = TALER_AUDITORDB_get_auditor_progress ( 384 TALER_ARL_adb, 385 TALER_ARL_GET_PP (wire_batch_deposit_id), 386 TALER_ARL_GET_PP (wire_aggregation_id), 387 NULL); 388 if (0 > qs) 389 goto handle_db_error; 390 391 qs = TALER_AUDITORDB_get_balance ( 392 TALER_ARL_adb, 393 TALER_ARL_GET_AB (total_amount_lag), 394 TALER_ARL_GET_AB (total_early_aggregation), 395 NULL); 396 if (0 > qs) 397 goto handle_db_error; 398 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 399 { 400 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, 401 "First analysis of with transfer auditor, starting audit from scratch\n"); 402 } 403 else 404 { 405 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 406 "Resuming transfer audit at %llu / %llu\n", 407 (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id), 408 (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id)); 409 } 410 411 qs = check_for_required_transfers (); 412 if (0 > qs) 413 goto handle_db_error; 414 qs = check_for_completed_transfers (); 415 if (0 > qs) 416 goto handle_db_error; 417 418 qs = TALER_AUDITORDB_update_auditor_progress ( 419 TALER_ARL_adb, 420 TALER_ARL_SET_PP (wire_batch_deposit_id), 421 TALER_ARL_SET_PP (wire_aggregation_id), 422 NULL); 423 if (0 > qs) 424 goto handle_db_error; 425 qs = TALER_AUDITORDB_insert_auditor_progress ( 426 TALER_ARL_adb, 427 TALER_ARL_SET_PP (wire_batch_deposit_id), 428 TALER_ARL_SET_PP (wire_aggregation_id), 429 NULL); 430 if (0 > qs) 431 goto handle_db_error; 432 qs = TALER_AUDITORDB_update_balance ( 433 TALER_ARL_adb, 434 TALER_ARL_SET_AB (total_amount_lag), 435 TALER_ARL_SET_AB (total_early_aggregation), 436 NULL); 437 if (0 > qs) 438 goto handle_db_error; 439 qs = TALER_AUDITORDB_insert_balance ( 440 TALER_ARL_adb, 441 TALER_ARL_SET_AB (total_amount_lag), 442 TALER_ARL_SET_AB (total_early_aggregation), 443 NULL); 444 if (0 > qs) 445 goto handle_db_error; 446 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 447 "Concluded audit step at %llu/%llu\n", 448 (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id), 449 (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id)); 450 TALER_EXCHANGEDB_rollback (TALER_ARL_edb); 451 qs = TALER_AUDITORDB_commit (TALER_ARL_adb); 452 if (0 > qs) 453 goto handle_db_error; 454 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 455 "Transaction concluded!\n"); 456 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 457 handle_db_error: 458 TALER_AUDITORDB_rollback (TALER_ARL_adb); 459 TALER_EXCHANGEDB_rollback (TALER_ARL_edb); 460 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 461 return qs; 462 } 463 464 465 /** 466 * Start auditor process. 467 */ 468 static void 469 start (void) 470 { 471 enum GNUNET_DB_QueryStatus qs; 472 473 for (unsigned int max_retries = 3; max_retries>0; max_retries--) 474 { 475 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 476 "Trying again (%u attempts left)\n", 477 max_retries); 478 qs = begin_transaction (); 479 if (GNUNET_DB_STATUS_SOFT_ERROR != qs) 480 break; 481 } 482 if (0 > qs) 483 { 484 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 485 "Audit failed\n"); 486 GNUNET_break (0); 487 global_ret = EXIT_FAILURE; 488 GNUNET_SCHEDULER_shutdown (); 489 return; 490 } 491 } 492 493 494 /** 495 * Function called on events received from Postgres. 496 * 497 * @param cls closure, NULL 498 * @param extra additional event data provided 499 * @param extra_size number of bytes in @a extra 500 */ 501 static void 502 db_notify (void *cls, 503 const void *extra, 504 size_t extra_size) 505 { 506 (void) cls; 507 (void) extra; 508 (void) extra_size; 509 510 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 511 "Received notification to wake transfer helper\n"); 512 start (); 513 } 514 515 516 /** 517 * Main function that will be run. 518 * 519 * @param cls closure 520 * @param args remaining command-line arguments 521 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 522 * @param c configuration 523 */ 524 static void 525 run (void *cls, 526 char *const *args, 527 const char *cfgfile, 528 const struct GNUNET_CONFIGURATION_Handle *c) 529 { 530 (void) cls; 531 (void) args; 532 (void) cfgfile; 533 cfg = c; 534 if (GNUNET_OK != 535 TALER_ARL_init (c)) 536 { 537 global_ret = EXIT_FAILURE; 538 return; 539 } 540 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, 541 NULL); 542 if (GNUNET_OK != 543 TALER_EXCHANGEDB_load_accounts (TALER_ARL_cfg, 544 TALER_EXCHANGEDB_ALO_DEBIT 545 | TALER_EXCHANGEDB_ALO_CREDIT 546 | TALER_EXCHANGEDB_ALO_AUTHDATA)) 547 { 548 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 549 "No bank accounts configured\n"); 550 global_ret = EXIT_NOTCONFIGURED; 551 GNUNET_SCHEDULER_shutdown (); 552 return; 553 } 554 if (0 == test_mode) 555 { 556 // FIXME-Optimization: use different event type in the future! 557 struct GNUNET_DB_EventHeaderP es = { 558 .size = htons (sizeof (es)), 559 .type = htons (TALER_DBEVENT_EXCHANGE_AUDITOR_WAKE_HELPER_WIRE) 560 }; 561 562 eh = TALER_AUDITORDB_event_listen (TALER_ARL_adb, 563 &es, 564 GNUNET_TIME_UNIT_FOREVER_REL, 565 &db_notify, 566 NULL); 567 GNUNET_assert (NULL != eh); 568 } 569 start (); 570 } 571 572 573 /** 574 * The main function of the wire auditing tool. Checks that 575 * the exchange's records of wire transfers match that of 576 * the wire gateway. 577 * 578 * @param argc number of arguments from the command line 579 * @param argv command line arguments 580 * @return 0 ok, 1 on error 581 */ 582 int 583 main (int argc, 584 char *const *argv) 585 { 586 const struct GNUNET_GETOPT_CommandLineOption options[] = { 587 GNUNET_GETOPT_option_flag ('i', 588 "internal", 589 "perform checks only applicable for exchange-internal audits", 590 &internal_checks), 591 GNUNET_GETOPT_option_flag ('t', 592 "test", 593 "run in test mode and exit when idle", 594 &test_mode), 595 GNUNET_GETOPT_option_timetravel ('T', 596 "timetravel"), 597 GNUNET_GETOPT_OPTION_END 598 }; 599 enum GNUNET_GenericReturnValue ret; 600 601 ret = GNUNET_PROGRAM_run ( 602 TALER_AUDITOR_project_data (), 603 argc, 604 argv, 605 "taler-helper-auditor-transfer", 606 gettext_noop ( 607 "Audit exchange database for consistency of aggregations/transfers with respect to deposit deadlines"), 608 options, 609 &run, 610 NULL); 611 if (GNUNET_SYSERR == ret) 612 return EXIT_INVALIDARGUMENT; 613 if (GNUNET_NO == ret) 614 return EXIT_SUCCESS; 615 return global_ret; 616 } 617 618 619 /* end of taler-helper-auditor-transfer.c */