taler-merchant-wirewatch.c (20240B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023, 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file src/backend/taler-merchant-wirewatch.c 18 * @brief Process that imports information about incoming bank transfers into the merchant backend 19 * @author Christian Grothoff 20 */ 21 #include "platform.h" 22 #include "microhttpd.h" 23 #include <gnunet/gnunet_util_lib.h> 24 #include <jansson.h> 25 #include <pthread.h> 26 #include <taler/taler_dbevents.h> 27 #include "taler/taler_merchant_util.h" 28 #include "taler/taler_merchant_bank_lib.h" 29 #include "merchantdb_lib.h" 30 #include "merchantdb_lib.h" 31 #include "merchant-database/insert_transfer.h" 32 #include "merchant-database/select_wirewatch_accounts.h" 33 #include "merchant-database/update_wirewatch_progress.h" 34 #include "merchant-database/event_listen.h" 35 36 37 /** 38 * Timeout for the bank interaction. Rather long as we should do long-polling 39 * and do not want to wake up too often. 40 */ 41 #define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \ 42 5) 43 44 45 /** 46 * Information about a watch job. 47 */ 48 struct Watch 49 { 50 /** 51 * Kept in a DLL. 52 */ 53 struct Watch *next; 54 55 /** 56 * Kept in a DLL. 57 */ 58 struct Watch *prev; 59 60 /** 61 * Next task to run, if any. 62 */ 63 struct GNUNET_SCHEDULER_Task *task; 64 65 /** 66 * Dynamically adjusted long polling time-out. 67 */ 68 struct GNUNET_TIME_Relative bank_timeout; 69 70 /** 71 * For which instance are we importing bank transfers? 72 */ 73 char *instance_id; 74 75 /** 76 * For which account are we importing bank transfers? 77 */ 78 struct TALER_FullPayto payto_uri; 79 80 /** 81 * Bank history request. 82 */ 83 struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh; 84 85 /** 86 * Start row for the bank interaction. Exclusive. 87 */ 88 uint64_t start_row; 89 90 /** 91 * Artificial delay to use between API calls. Used to 92 * throttle on failures. 93 */ 94 struct GNUNET_TIME_Relative delay; 95 96 /** 97 * When did we start our last HTTP request? 98 */ 99 struct GNUNET_TIME_Absolute start_time; 100 101 /** 102 * How long should long-polling take at least? 103 */ 104 struct GNUNET_TIME_Absolute long_poll_timeout; 105 106 /** 107 * Login data for the bank. 108 */ 109 struct TALER_MERCHANT_BANK_AuthenticationData ad; 110 111 /** 112 * Set to true if we found a transaction in the last iteration. 113 */ 114 bool found; 115 116 }; 117 118 119 /** 120 * Head of active watches. 121 */ 122 static struct Watch *w_head; 123 124 /** 125 * Tail of active watches. 126 */ 127 static struct Watch *w_tail; 128 129 /** 130 * The merchant's configuration. 131 */ 132 static const struct GNUNET_CONFIGURATION_Handle *cfg; 133 134 /** 135 * Our database connection. 136 */ 137 static struct TALER_MERCHANTDB_PostgresContext *pg; 138 139 /** 140 * Handle to the context for interacting with the bank. 141 */ 142 static struct GNUNET_CURL_Context *ctx; 143 144 /** 145 * Scheduler context for running the @e ctx. 146 */ 147 static struct GNUNET_CURL_RescheduleContext *rc; 148 149 /** 150 * Event handler to learn that the configuration changed 151 * and we should shutdown (to be restarted). 152 */ 153 static struct GNUNET_DB_EventHandler *eh; 154 155 /** 156 * Value to return from main(). 0 on success, non-zero on errors. 157 */ 158 static int global_ret; 159 160 /** 161 * How many transactions should we fetch at most per batch? 162 */ 163 static unsigned int batch_size = 32; 164 165 /** 166 * #GNUNET_YES if we are in test mode and should exit when idle. 167 */ 168 static int test_mode; 169 170 /** 171 * #GNUNET_YES if we are in persistent mode and do 172 * not exit on #config_changed. 173 */ 174 static int persist_mode; 175 176 /** 177 * Set to true if we are shutting down due to a 178 * configuration change. 179 */ 180 static bool config_changed_flag; 181 182 /** 183 * Save progress in DB. 184 */ 185 static void 186 save (struct Watch *w) 187 { 188 enum GNUNET_DB_QueryStatus qs; 189 190 qs = TALER_MERCHANTDB_update_wirewatch_progress (pg, 191 w->instance_id, 192 w->payto_uri, 193 w->start_row); 194 if (qs < 0) 195 { 196 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 197 "Failed to persist wirewatch progress for %s/%s (%d)\n", 198 w->instance_id, 199 w->payto_uri.full_payto, 200 qs); 201 GNUNET_SCHEDULER_shutdown (); 202 global_ret = EXIT_FAILURE; 203 } 204 } 205 206 207 /** 208 * Free resources of @a w. 209 * 210 * @param w watch job to terminate 211 */ 212 static void 213 end_watch (struct Watch *w) 214 { 215 if (NULL != w->task) 216 { 217 GNUNET_SCHEDULER_cancel (w->task); 218 w->task = NULL; 219 } 220 if (NULL != w->hh) 221 { 222 TALER_MERCHANT_BANK_credit_history_cancel (w->hh); 223 w->hh = NULL; 224 } 225 GNUNET_free (w->instance_id); 226 GNUNET_free (w->payto_uri.full_payto); 227 TALER_MERCHANT_BANK_auth_free (&w->ad); 228 GNUNET_CONTAINER_DLL_remove (w_head, 229 w_tail, 230 w); 231 GNUNET_free (w); 232 } 233 234 235 /** 236 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 237 * 238 * @param cls closure 239 */ 240 static void 241 shutdown_task (void *cls) 242 { 243 (void) cls; 244 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 245 "Running shutdown\n"); 246 while (NULL != w_head) 247 { 248 struct Watch *w = w_head; 249 250 save (w); 251 end_watch (w); 252 } 253 if (NULL != eh) 254 { 255 TALER_MERCHANTDB_event_listen_cancel (eh); 256 eh = NULL; 257 } 258 if (NULL != pg) 259 { 260 TALER_MERCHANTDB_disconnect (pg); 261 pg = NULL; 262 } 263 cfg = NULL; 264 if (NULL != ctx) 265 { 266 GNUNET_CURL_fini (ctx); 267 ctx = NULL; 268 } 269 if (NULL != rc) 270 { 271 GNUNET_CURL_gnunet_rc_destroy (rc); 272 rc = NULL; 273 } 274 } 275 276 277 /** 278 * Parse @a subject from wire transfer into @a wtid and @a exchange_url. 279 * 280 * @param subject wire transfer subject to parse; 281 * format is "$WTID $URL" 282 * @param[out] wtid wire transfer ID to extract 283 * @param[out] exchange_url set to exchange URL 284 * @return #GNUNET_OK on success 285 */ 286 static enum GNUNET_GenericReturnValue 287 parse_subject (const char *subject, 288 struct TALER_WireTransferIdentifierRawP *wtid, 289 char **exchange_url) 290 { 291 const char *space; 292 293 space = strchr (subject, ' '); 294 if (NULL == space) 295 return GNUNET_NO; 296 if (GNUNET_OK != 297 GNUNET_STRINGS_string_to_data (subject, 298 space - subject, 299 wtid, 300 sizeof (*wtid))) 301 return GNUNET_NO; 302 space++; 303 if (! TALER_url_valid_charset (space)) 304 return GNUNET_NO; 305 if ( (0 != strncasecmp ("http://", 306 space, 307 strlen ("http://"))) && 308 (0 != strncasecmp ("https://", 309 space, 310 strlen ("https://"))) ) 311 return GNUNET_NO; 312 *exchange_url = GNUNET_strdup (space); 313 return GNUNET_OK; 314 } 315 316 317 /** 318 * Run next iteration. 319 * 320 * @param cls a `struct Watch *` 321 */ 322 static void 323 do_work (void *cls); 324 325 326 /** 327 * Callbacks of this type are used to serve the result of asking 328 * the bank for the credit transaction history. 329 * 330 * @param cls a `struct Watch *` 331 * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request 332 * 0 if the bank's reply is bogus (fails to follow the protocol), 333 * #MHD_HTTP_NO_CONTENT if there are no more results; on success the 334 * last callback is always of this status (even if `abs(num_results)` were 335 * already returned). 336 * @param ec detailed error code 337 * @param serial_id monotonically increasing counter corresponding to the transaction 338 * @param details details about the wire transfer 339 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration 340 */ 341 static enum GNUNET_GenericReturnValue 342 credit_cb ( 343 void *cls, 344 unsigned int http_status, 345 enum TALER_ErrorCode ec, 346 uint64_t serial_id, 347 const struct TALER_MERCHANT_BANK_CreditDetails *details) 348 { 349 struct Watch *w = cls; 350 351 switch (http_status) 352 { 353 case 0: 354 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 355 "Invalid HTTP response (HTTP status: 0, %d) from bank\n", 356 ec); 357 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 358 break; 359 case MHD_HTTP_OK: 360 { 361 enum GNUNET_DB_QueryStatus qs; 362 char *exchange_url; 363 struct TALER_WireTransferIdentifierRawP wtid; 364 bool no_instance; 365 bool no_account; 366 bool conflict; 367 368 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 369 "Received wire transfer `%s' over %s\n", 370 details->wire_subject, 371 TALER_amount2s (&details->amount)); 372 w->found = true; 373 if (GNUNET_OK != 374 parse_subject (details->wire_subject, 375 &wtid, 376 &exchange_url)) 377 { 378 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 379 "Skipping transfer %llu (%s): not from exchange\n", 380 (unsigned long long) serial_id, 381 details->wire_subject); 382 w->start_row = serial_id; 383 return GNUNET_OK; 384 } 385 /* FIXME-Performance-Optimization: consider grouping multiple inserts 386 into one bigger transaction with just one notify. */ 387 qs = TALER_MERCHANTDB_insert_transfer (pg, 388 w->instance_id, 389 exchange_url, 390 &wtid, 391 &details->amount, 392 details->credit_account_uri, 393 serial_id, 394 &no_instance, 395 &no_account, 396 &conflict); 397 398 GNUNET_free (exchange_url); 399 if (qs < 0) 400 { 401 GNUNET_break (0); 402 GNUNET_SCHEDULER_shutdown (); 403 w->hh = NULL; 404 return GNUNET_SYSERR; 405 } 406 if (no_instance) 407 { 408 GNUNET_break (0); 409 GNUNET_SCHEDULER_shutdown (); 410 w->hh = NULL; 411 return GNUNET_SYSERR; 412 } 413 if (no_account) 414 { 415 GNUNET_break (0); 416 GNUNET_SCHEDULER_shutdown (); 417 w->hh = NULL; 418 return GNUNET_SYSERR; 419 } 420 if (conflict) 421 { 422 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 423 "Fatal: new wire transfer #%llu has same WTID but different amount %s compared to a previous transfer\n", 424 (unsigned long long) serial_id, 425 TALER_amount2s (&details->amount)); 426 GNUNET_SCHEDULER_shutdown (); 427 w->hh = NULL; 428 return GNUNET_SYSERR; 429 } 430 /* Success => reset back-off timer! */ 431 w->delay = GNUNET_TIME_UNIT_ZERO; 432 } 433 w->start_row = serial_id; 434 return GNUNET_OK; 435 case MHD_HTTP_NO_CONTENT: 436 save (w); 437 /* Delay artificially if server returned before long-poll timeout */ 438 if (! w->found) 439 w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout); 440 break; 441 case MHD_HTTP_NOT_FOUND: 442 /* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */ 443 w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES, 444 GNUNET_TIME_STD_BACKOFF (w->delay)); 445 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 446 "Bank claims account is unknown, waiting for %s before trying again\n", 447 GNUNET_TIME_relative2s (w->delay, 448 true)); 449 break; 450 case MHD_HTTP_GATEWAY_TIMEOUT: 451 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 452 "Gateway timeout, adjusting long polling threshold\n"); 453 /* Limit new timeout at request delay */ 454 w->bank_timeout 455 = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration ( 456 w->start_time), 457 w->bank_timeout); 458 /* set the timeout a bit earlier */ 459 w->bank_timeout 460 = GNUNET_TIME_relative_subtract (w->bank_timeout, 461 GNUNET_TIME_UNIT_SECONDS); 462 /* do not allow it to go to zero */ 463 w->bank_timeout 464 = GNUNET_TIME_relative_max (w->bank_timeout, 465 GNUNET_TIME_UNIT_SECONDS); 466 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 467 break; 468 default: 469 /* Something went wrong, try again, but with back-off */ 470 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 471 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 472 "Unexpected HTTP status code %u(%d) from wire gateway\n", 473 http_status, 474 ec); 475 break; 476 } 477 w->hh = NULL; 478 if (test_mode && (! w->found)) 479 { 480 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 481 "No transactions found and in test mode. Ending watch!\n"); 482 end_watch (w); 483 if (NULL == w_head) 484 GNUNET_SCHEDULER_shutdown (); 485 return GNUNET_OK; 486 } 487 w->task = GNUNET_SCHEDULER_add_delayed (w->delay, 488 &do_work, 489 w); 490 return GNUNET_OK; 491 } 492 493 494 static void 495 do_work (void *cls) 496 { 497 struct Watch *w = cls; 498 499 w->task = NULL; 500 w->found = false; 501 w->long_poll_timeout 502 = GNUNET_TIME_relative_to_absolute (w->bank_timeout); 503 w->start_time 504 = GNUNET_TIME_absolute_get (); 505 w->hh = TALER_MERCHANT_BANK_credit_history (ctx, 506 &w->ad, 507 w->start_row, 508 batch_size, 509 test_mode 510 ? GNUNET_TIME_UNIT_ZERO 511 : w->bank_timeout, 512 &credit_cb, 513 w); 514 if (NULL == w->hh) 515 { 516 GNUNET_break (0); 517 GNUNET_SCHEDULER_shutdown (); 518 return; 519 } 520 } 521 522 523 /** 524 * Function called with information about a accounts 525 * the wirewatcher should monitor. 526 * 527 * @param cls closure (NULL) 528 * @param instance instance that owns the account 529 * @param payto_uri account URI 530 * @param credit_facade_url URL for the credit facade 531 * @param credit_facade_credentials account access credentials 532 * @param last_serial last transaction serial (inclusive) we have seen from this account 533 */ 534 static void 535 start_watch ( 536 void *cls, 537 const char *instance, 538 struct TALER_FullPayto payto_uri, 539 const char *credit_facade_url, 540 const json_t *credit_facade_credentials, 541 uint64_t last_serial) 542 { 543 struct Watch *w = GNUNET_new (struct Watch); 544 545 (void) cls; 546 w->bank_timeout = BANK_TIMEOUT; 547 if (GNUNET_OK != 548 TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials, 549 credit_facade_url, 550 &w->ad)) 551 { 552 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 553 "Failed to parse authentication data of `%s/%s'\n", 554 instance, 555 payto_uri.full_payto); 556 GNUNET_free (w); 557 GNUNET_SCHEDULER_shutdown (); 558 global_ret = EXIT_NOTCONFIGURED; 559 return; 560 } 561 562 GNUNET_CONTAINER_DLL_insert (w_head, 563 w_tail, 564 w); 565 w->instance_id = GNUNET_strdup (instance); 566 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 567 w->start_row = last_serial; 568 w->task = GNUNET_SCHEDULER_add_now (&do_work, 569 w); 570 } 571 572 573 /** 574 * Function called on configuration change events received from Postgres. We 575 * shutdown (and systemd should restart us). 576 * 577 * @param cls closure (NULL) 578 * @param extra additional event data provided 579 * @param extra_size number of bytes in @a extra 580 */ 581 static void 582 config_changed (void *cls, 583 const void *extra, 584 size_t extra_size) 585 { 586 (void) cls; 587 (void) extra; 588 (void) extra_size; 589 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 590 "Configuration changed, %s\n", 591 0 == persist_mode 592 ? "restarting" 593 : "reinitializing"); 594 config_changed_flag = true; 595 GNUNET_SCHEDULER_shutdown (); 596 } 597 598 599 /** 600 * First task. 601 * 602 * @param cls closure, NULL 603 * @param args remaining command-line arguments 604 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 605 * @param c configuration 606 */ 607 static void 608 run (void *cls, 609 char *const *args, 610 const char *cfgfile, 611 const struct GNUNET_CONFIGURATION_Handle *c) 612 { 613 (void) args; 614 (void) cfgfile; 615 616 cfg = c; 617 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 618 NULL); 619 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 620 &rc); 621 rc = GNUNET_CURL_gnunet_rc_create (ctx); 622 if (NULL == ctx) 623 { 624 GNUNET_break (0); 625 GNUNET_SCHEDULER_shutdown (); 626 global_ret = EXIT_FAILURE; 627 return; 628 } 629 if (NULL == 630 (pg = TALER_MERCHANTDB_connect (cfg))) 631 { 632 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 633 "Failed to initialize DB subsystem. Consider running taler-merchant-dbconfig\n"); 634 GNUNET_SCHEDULER_shutdown (); 635 global_ret = EXIT_FAILURE; 636 return; 637 } 638 { 639 struct GNUNET_DB_EventHeaderP es = { 640 .size = htons (sizeof (es)), 641 .type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED) 642 }; 643 644 eh = TALER_MERCHANTDB_event_listen (pg, 645 &es, 646 GNUNET_TIME_UNIT_FOREVER_REL, 647 &config_changed, 648 NULL); 649 } 650 { 651 enum GNUNET_DB_QueryStatus qs; 652 653 qs = TALER_MERCHANTDB_select_wirewatch_accounts (pg, 654 &start_watch, 655 NULL); 656 if (qs < 0) 657 { 658 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 659 "Failed to obtain wirewatch accounts from database\n"); 660 GNUNET_SCHEDULER_shutdown (); 661 global_ret = EXIT_NO_RESTART; 662 return; 663 } 664 if ( (NULL == w_head) && 665 (GNUNET_YES == test_mode) ) 666 { 667 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 668 "No active wirewatch accounts in database and in test mode. Exiting.\n"); 669 GNUNET_SCHEDULER_shutdown (); 670 global_ret = EXIT_SUCCESS; 671 return; 672 } 673 } 674 } 675 676 677 /** 678 * The main function of taler-merchant-wirewatch 679 * 680 * @param argc number of arguments from the command line 681 * @param argv command line arguments 682 * @return 0 ok, 1 on error 683 */ 684 int 685 main (int argc, 686 char *const *argv) 687 { 688 struct GNUNET_GETOPT_CommandLineOption options[] = { 689 GNUNET_GETOPT_option_flag ('p', 690 "persist", 691 "run in persist mode and do not exit on configuration changes", 692 &persist_mode), 693 GNUNET_GETOPT_option_timetravel ('T', 694 "timetravel"), 695 GNUNET_GETOPT_option_flag ('t', 696 "test", 697 "run in test mode and exit when idle", 698 &test_mode), 699 GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), 700 GNUNET_GETOPT_OPTION_END 701 }; 702 enum GNUNET_GenericReturnValue ret; 703 704 do { 705 config_changed_flag = false; 706 ret = GNUNET_PROGRAM_run ( 707 TALER_MERCHANT_project_data (), 708 argc, argv, 709 "taler-merchant-wirewatch", 710 gettext_noop ( 711 "background process that watches for incoming wire transfers to the merchant bank account"), 712 options, 713 &run, NULL); 714 } while ( (1 == persist_mode) && 715 config_changed_flag); 716 if (GNUNET_SYSERR == ret) 717 return EXIT_INVALIDARGUMENT; 718 if (GNUNET_NO == ret) 719 return EXIT_SUCCESS; 720 return global_ret; 721 } 722 723 724 /* end of taler-exchange-wirewatch.c */