taler-merchant-reconciliation.c (35985B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023-2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file src/backend/taler-merchant-reconciliation.c 18 * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange 19 * @author Christian Grothoff 20 */ 21 #include "platform.h" 22 #include "microhttpd.h" 23 #include <gnunet/gnunet_util_lib.h> 24 #include <jansson.h> 25 #include <pthread.h> 26 #include <taler/taler_dbevents.h> 27 #include "taler/taler_merchant_util.h" 28 #include "taler/taler_merchant_bank_lib.h" 29 #include "merchantdb_lib.h" 30 #include "merchantdb_lib.h" 31 #include "merchant-database/finalize_transfer_status.h" 32 #include "merchant-database/insert_transfer_details.h" 33 #include "merchant-database/lookup_deposits_by_contract_and_coin.h" 34 #include "merchant-database/lookup_wire_fee.h" 35 #include "merchant-database/select_exchange_keys.h" 36 #include "merchant-database/select_open_transfers.h" 37 #include "merchant-database/update_transfer_status.h" 38 #include "merchant-database/event_listen.h" 39 #include "merchant-database/preflight.h" 40 41 /** 42 * Timeout for the exchange interaction. Rather long as we should do 43 * long-polling and do not want to wake up too often. 44 */ 45 #define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \ 46 GNUNET_TIME_UNIT_MINUTES, \ 47 30) 48 49 /** 50 * How many inquiries do we process concurrently at most. 51 */ 52 #define OPEN_INQUIRY_LIMIT 1024 53 54 /** 55 * How many inquiries do we process concurrently per exchange at most. 56 */ 57 #define EXCHANGE_INQUIRY_LIMIT 16 58 59 60 /** 61 * Information about an inquiry job. 62 */ 63 struct Inquiry; 64 65 66 /** 67 * Information about an exchange. 68 */ 69 struct Exchange 70 { 71 /** 72 * Kept in a DLL. 73 */ 74 struct Exchange *next; 75 76 /** 77 * Kept in a DLL. 78 */ 79 struct Exchange *prev; 80 81 /** 82 * Head of active inquiries. 83 */ 84 struct Inquiry *w_head; 85 86 /** 87 * Tail of active inquiries. 88 */ 89 struct Inquiry *w_tail; 90 91 /** 92 * Which exchange are we tracking here. 93 */ 94 char *exchange_url; 95 96 /** 97 * The keys of this exchange 98 */ 99 struct TALER_EXCHANGE_Keys *keys; 100 101 /** 102 * How many active inquiries do we have right now with this exchange. 103 */ 104 unsigned int exchange_inquiries; 105 106 /** 107 * How long should we wait between requests 108 * for transfer details? 109 */ 110 struct GNUNET_TIME_Relative transfer_delay; 111 112 }; 113 114 115 /** 116 * Information about an inquiry job. 117 */ 118 struct Inquiry 119 { 120 /** 121 * Kept in a DLL. 122 */ 123 struct Inquiry *next; 124 125 /** 126 * Kept in a DLL. 127 */ 128 struct Inquiry *prev; 129 130 /** 131 * Handle to the exchange that made the transfer. 132 */ 133 struct Exchange *exchange; 134 135 /** 136 * Task where we retry fetching transfer details from the exchange. 137 */ 138 struct GNUNET_SCHEDULER_Task *task; 139 140 /** 141 * For which merchant instance is this tracking request? 142 */ 143 char *instance_id; 144 145 /** 146 * payto:// URI used for the transfer. 147 */ 148 struct TALER_FullPayto payto_uri; 149 150 /** 151 * Handle for the GET /transfers request. 152 */ 153 struct TALER_EXCHANGE_GetTransfersHandle *wdh; 154 155 /** 156 * When did the transfer happen? 157 */ 158 struct GNUNET_TIME_Timestamp execution_time; 159 160 /** 161 * Argument for the /wire/transfers request. 162 */ 163 struct TALER_WireTransferIdentifierRawP wtid; 164 165 /** 166 * Row of the wire transfer in our database. 167 */ 168 uint64_t rowid; 169 170 }; 171 172 173 /** 174 * Head of known exchanges. 175 */ 176 static struct Exchange *e_head; 177 178 /** 179 * Tail of known exchanges. 180 */ 181 static struct Exchange *e_tail; 182 183 /** 184 * The merchant's configuration. 185 */ 186 static const struct GNUNET_CONFIGURATION_Handle *cfg; 187 188 /** 189 * Our database connection. 190 */ 191 static struct TALER_MERCHANTDB_PostgresContext *pg; 192 193 /** 194 * Handle to the context for interacting with the bank. 195 */ 196 static struct GNUNET_CURL_Context *ctx; 197 198 /** 199 * Scheduler context for running the @e ctx. 200 */ 201 static struct GNUNET_CURL_RescheduleContext *rc; 202 203 /** 204 * Main task for #find_work(). 205 */ 206 static struct GNUNET_SCHEDULER_Task *task; 207 208 /** 209 * Event handler to learn that there are new transfers 210 * to check. 211 */ 212 static struct GNUNET_DB_EventHandler *eh; 213 214 /** 215 * Event handler to learn that there may be new exchange 216 * keys to check. 217 */ 218 static struct GNUNET_DB_EventHandler *eh_keys; 219 220 /** 221 * How many active inquiries do we have right now. 222 */ 223 static unsigned int active_inquiries; 224 225 /** 226 * Set to true if we ever encountered any problem. 227 */ 228 static bool found_problem; 229 230 /** 231 * Value to return from main(). 0 on success, non-zero on errors. 232 */ 233 static int global_ret; 234 235 /** 236 * #GNUNET_YES if we are in test mode and should exit when idle. 237 */ 238 static int test_mode; 239 240 /** 241 * True if the last DB query was limited by the 242 * #OPEN_INQUIRY_LIMIT and we thus should check again 243 * as soon as we are substantially below that limit, 244 * and not only when we get a DB notification. 245 */ 246 static bool at_limit; 247 248 249 /** 250 * Initiate download from exchange. 251 * 252 * @param cls a `struct Inquiry *` 253 */ 254 static void 255 exchange_request (void *cls); 256 257 258 /** 259 * The exchange @a e is ready to handle more inquiries, 260 * prepare to launch them. 261 * 262 * @param[in,out] e exchange to potentially launch inquiries on 263 */ 264 static void 265 launch_inquiries_at_exchange (struct Exchange *e) 266 { 267 for (struct Inquiry *w = e->w_head; 268 NULL != w; 269 w = w->next) 270 { 271 if (e->exchange_inquiries >= EXCHANGE_INQUIRY_LIMIT) 272 break; 273 if ( (NULL == w->task) && 274 (NULL == w->wdh) ) 275 { 276 e->exchange_inquiries++; 277 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 278 w); 279 } 280 } 281 } 282 283 284 /** 285 * Updates the transaction status for inquiry @a w to the given values. 286 * 287 * @param w inquiry to update status for 288 * @param next_attempt when should we retry @a w (if ever) 289 * @param http_status HTTP status of the response 290 * @param ec error code to use (if any) 291 * @param last_hint hint delivered with the response (if any, possibly NULL) 292 * @param needs_retry true if we should try the HTTP request again 293 */ 294 static void 295 update_transaction_status (const struct Inquiry *w, 296 struct GNUNET_TIME_Absolute next_attempt, 297 unsigned int http_status, 298 enum TALER_ErrorCode ec, 299 const char *last_hint, 300 bool needs_retry) 301 { 302 enum GNUNET_DB_QueryStatus qs; 303 304 qs = TALER_MERCHANTDB_update_transfer_status (pg, 305 w->exchange->exchange_url, 306 &w->wtid, 307 next_attempt, 308 http_status, 309 ec, 310 last_hint, 311 needs_retry); 312 if (qs < 0) 313 { 314 GNUNET_break (0); 315 global_ret = EXIT_FAILURE; 316 GNUNET_SCHEDULER_shutdown (); 317 return; 318 } 319 } 320 321 322 /** 323 * Interact with the database to get the current set 324 * of exchange keys known to us. 325 * 326 * @param e the exchange to check 327 */ 328 static void 329 sync_keys (struct Exchange *e) 330 { 331 enum GNUNET_DB_QueryStatus qs; 332 struct TALER_EXCHANGE_Keys *keys; 333 struct GNUNET_TIME_Absolute first_retry; 334 335 qs = TALER_MERCHANTDB_select_exchange_keys (pg, 336 e->exchange_url, 337 &first_retry, 338 &keys); 339 if (qs < 0) 340 { 341 GNUNET_break (0); 342 return; 343 } 344 if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) || 345 (NULL == keys) ) 346 { 347 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 348 "Cannot launch inquiries at `%s': lacking /keys response\n", 349 e->exchange_url); 350 return; 351 } 352 TALER_EXCHANGE_keys_decref (e->keys); 353 e->keys = keys; 354 launch_inquiries_at_exchange (e); 355 } 356 357 358 /** 359 * Lookup our internal data structure for the given 360 * @a exchange_url or create one if we do not yet have 361 * one. 362 * 363 * @param exchange_url base URL of the exchange 364 * @return our state for this exchange 365 */ 366 static struct Exchange * 367 find_exchange (const char *exchange_url) 368 { 369 struct Exchange *e; 370 371 for (e = e_head; NULL != e; e = e->next) 372 if (0 == strcmp (exchange_url, 373 e->exchange_url)) 374 return e; 375 e = GNUNET_new (struct Exchange); 376 e->exchange_url = GNUNET_strdup (exchange_url); 377 GNUNET_CONTAINER_DLL_insert (e_head, 378 e_tail, 379 e); 380 sync_keys (e); 381 return e; 382 } 383 384 385 /** 386 * Finds new transfers that require work in the merchant database. 387 * 388 * @param cls NULL 389 */ 390 static void 391 find_work (void *cls); 392 393 394 /** 395 * Free resources of @a w. 396 * 397 * @param[in] w inquiry job to terminate 398 */ 399 static void 400 end_inquiry (struct Inquiry *w) 401 { 402 struct Exchange *e = w->exchange; 403 404 GNUNET_assert (active_inquiries > 0); 405 active_inquiries--; 406 if (NULL != w->wdh) 407 { 408 TALER_EXCHANGE_get_transfers_cancel (w->wdh); 409 w->wdh = NULL; 410 } 411 GNUNET_free (w->instance_id); 412 GNUNET_free (w->payto_uri.full_payto); 413 GNUNET_CONTAINER_DLL_remove (e->w_head, 414 e->w_tail, 415 w); 416 GNUNET_free (w); 417 if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) && 418 (NULL == task) && 419 (at_limit) ) 420 { 421 at_limit = false; 422 GNUNET_assert (NULL == task); 423 task = GNUNET_SCHEDULER_add_now (&find_work, 424 NULL); 425 } 426 if ( (NULL == task) && 427 (! at_limit) && 428 (0 == active_inquiries) && 429 (test_mode) ) 430 { 431 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 432 "No more open inquiries and in test mode. Exiting.\n"); 433 GNUNET_SCHEDULER_shutdown (); 434 return; 435 } 436 } 437 438 439 /** 440 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 441 * 442 * @param cls closure (NULL) 443 */ 444 static void 445 shutdown_task (void *cls) 446 { 447 (void) cls; 448 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 449 "Running shutdown\n"); 450 while (NULL != e_head) 451 { 452 struct Exchange *e = e_head; 453 454 while (NULL != e->w_head) 455 { 456 struct Inquiry *w = e->w_head; 457 458 end_inquiry (w); 459 } 460 GNUNET_free (e->exchange_url); 461 if (NULL != e->keys) 462 { 463 TALER_EXCHANGE_keys_decref (e->keys); 464 e->keys = NULL; 465 } 466 GNUNET_CONTAINER_DLL_remove (e_head, 467 e_tail, 468 e); 469 GNUNET_free (e); 470 } 471 if (NULL != eh) 472 { 473 TALER_MERCHANTDB_event_listen_cancel (eh); 474 eh = NULL; 475 } 476 if (NULL != eh_keys) 477 { 478 TALER_MERCHANTDB_event_listen_cancel (eh_keys); 479 eh_keys = NULL; 480 } 481 if (NULL != task) 482 { 483 GNUNET_SCHEDULER_cancel (task); 484 task = NULL; 485 } 486 if (NULL != pg) 487 { 488 TALER_MERCHANTDB_disconnect (pg); 489 pg = NULL; 490 } 491 cfg = NULL; 492 if (NULL != ctx) 493 { 494 GNUNET_CURL_fini (ctx); 495 ctx = NULL; 496 } 497 if (NULL != rc) 498 { 499 GNUNET_CURL_gnunet_rc_destroy (rc); 500 rc = NULL; 501 } 502 } 503 504 505 /** 506 * Check that the given @a wire_fee is what the @a e should charge 507 * at the @a execution_time. If the fee is correct (according to our 508 * database), return #GNUNET_OK. If we do not have the fee structure in our 509 * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee 510 * is bogus, we respond with the proof to the client and return 511 * #GNUNET_SYSERR. 512 * 513 * @param w inquiry to check fees of 514 * @param execution_time time of the wire transfer 515 * @param wire_fee fee claimed by the exchange 516 * @return #GNUNET_SYSERR if we returned hard proof of 517 * missbehavior from the exchange to the client 518 */ 519 static enum GNUNET_GenericReturnValue 520 check_wire_fee (struct Inquiry *w, 521 struct GNUNET_TIME_Timestamp execution_time, 522 const struct TALER_Amount *wire_fee) 523 { 524 struct Exchange *e = w->exchange; 525 const struct TALER_EXCHANGE_Keys *keys = e->keys; 526 struct TALER_WireFeeSet fees; 527 struct TALER_MasterSignatureP master_sig; 528 struct GNUNET_TIME_Timestamp start_date; 529 struct GNUNET_TIME_Timestamp end_date; 530 enum GNUNET_DB_QueryStatus qs; 531 char *wire_method; 532 533 if (NULL == keys) 534 { 535 GNUNET_break (0); 536 return GNUNET_NO; 537 } 538 wire_method = TALER_payto_get_method (w->payto_uri.full_payto); 539 qs = TALER_MERCHANTDB_lookup_wire_fee (pg, 540 &keys->master_pub, 541 wire_method, 542 execution_time, 543 &fees, 544 &start_date, 545 &end_date, 546 &master_sig); 547 switch (qs) 548 { 549 case GNUNET_DB_STATUS_HARD_ERROR: 550 GNUNET_break (0); 551 GNUNET_free (wire_method); 552 return GNUNET_SYSERR; 553 case GNUNET_DB_STATUS_SOFT_ERROR: 554 GNUNET_free (wire_method); 555 return GNUNET_NO; 556 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 557 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 558 "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n", 559 TALER_B2S (&keys->master_pub), 560 wire_method, 561 GNUNET_TIME_timestamp2s (execution_time), 562 TALER_amount2s (wire_fee)); 563 GNUNET_free (wire_method); 564 return GNUNET_OK; 565 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 566 break; 567 } 568 if ( (GNUNET_OK != 569 TALER_amount_cmp_currency (&fees.wire, 570 wire_fee)) || 571 (0 > TALER_amount_cmp (&fees.wire, 572 wire_fee)) ) 573 { 574 GNUNET_break_op (0); 575 GNUNET_free (wire_method); 576 return GNUNET_SYSERR; /* expected_fee >= wire_fee */ 577 } 578 GNUNET_free (wire_method); 579 return GNUNET_OK; 580 } 581 582 583 /** 584 * Closure for #check_transfer() 585 */ 586 struct CheckTransferContext 587 { 588 589 /** 590 * Pointer to the detail that we are currently 591 * checking in #check_transfer(). 592 */ 593 const struct TALER_TrackTransferDetails *current_detail; 594 595 /** 596 * Which transaction detail are we currently looking at? 597 */ 598 unsigned int current_offset; 599 600 /** 601 * #GNUNET_NO if we did not find a matching coin. 602 * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. 603 * #GNUNET_OK if we did find a matching coin. 604 */ 605 enum GNUNET_GenericReturnValue check_transfer_result; 606 607 /** 608 * Set to error code, if any. 609 */ 610 enum TALER_ErrorCode ec; 611 612 /** 613 * Set to true if @e ec indicates a permanent failure. 614 */ 615 bool failure; 616 }; 617 618 619 /** 620 * This function checks that the information about the coin which 621 * was paid back by _this_ wire transfer matches what _we_ (the merchant) 622 * knew about this coin. 623 * 624 * @param cls closure with our `struct CheckTransferContext *` 625 * @param exchange_url URL of the exchange that issued @a coin_pub 626 * @param amount_with_fee amount the exchange will transfer for this coin 627 * @param deposit_fee fee the exchange will charge for this coin 628 * @param refund_fee fee the exchange will charge for refunding this coin 629 * @param wire_fee paid wire fee 630 * @param h_wire hash of merchant's wire details 631 * @param deposit_timestamp when did the exchange receive the deposit 632 * @param refund_deadline until when are refunds allowed 633 * @param exchange_sig signature by the exchange 634 * @param exchange_pub exchange signing key used for @a exchange_sig 635 */ 636 static void 637 check_transfer (void *cls, 638 const char *exchange_url, 639 const struct TALER_Amount *amount_with_fee, 640 const struct TALER_Amount *deposit_fee, 641 const struct TALER_Amount *refund_fee, 642 const struct TALER_Amount *wire_fee, 643 const struct TALER_MerchantWireHashP *h_wire, 644 struct GNUNET_TIME_Timestamp deposit_timestamp, 645 struct GNUNET_TIME_Timestamp refund_deadline, 646 const struct TALER_ExchangeSignatureP *exchange_sig, 647 const struct TALER_ExchangePublicKeyP *exchange_pub) 648 { 649 struct CheckTransferContext *ctc = cls; 650 const struct TALER_TrackTransferDetails *ttd = ctc->current_detail; 651 652 if (GNUNET_SYSERR == ctc->check_transfer_result) 653 { 654 GNUNET_break (0); 655 return; /* already had a serious issue; odd that we're called more than once as well... */ 656 } 657 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 658 "Checking coin with value %s\n", 659 TALER_amount2s (amount_with_fee)); 660 if ( (GNUNET_OK != 661 TALER_amount_cmp_currency (amount_with_fee, 662 &ttd->coin_value)) || 663 (0 != TALER_amount_cmp (amount_with_fee, 664 &ttd->coin_value)) ) 665 { 666 /* Disagreement between the exchange and us about how much this 667 coin is worth! */ 668 GNUNET_break_op (0); 669 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 670 "Disagreement about coin value %s\n", 671 TALER_amount2s (amount_with_fee)); 672 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 673 "Exchange gave it a value of %s\n", 674 TALER_amount2s (&ttd->coin_value)); 675 ctc->check_transfer_result = GNUNET_SYSERR; 676 /* Build the `TrackTransferConflictDetails` */ 677 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 678 ctc->failure = true; 679 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 680 return; 681 } 682 if ( (GNUNET_OK != 683 TALER_amount_cmp_currency (deposit_fee, 684 &ttd->coin_fee)) || 685 (0 != TALER_amount_cmp (deposit_fee, 686 &ttd->coin_fee)) ) 687 { 688 /* Disagreement between the exchange and us about how much this 689 coin is worth! */ 690 GNUNET_break_op (0); 691 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 692 "Expected fee is %s\n", 693 TALER_amount2s (&ttd->coin_fee)); 694 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 695 "Fee claimed by exchange is %s\n", 696 TALER_amount2s (deposit_fee)); 697 ctc->check_transfer_result = GNUNET_SYSERR; 698 /* Build the `TrackTransferConflictDetails` */ 699 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 700 ctc->failure = true; 701 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 702 return; 703 } 704 ctc->check_transfer_result = GNUNET_OK; 705 } 706 707 708 /** 709 * Function called with detailed wire transfer data, including all 710 * of the coin transactions that were combined into the wire transfer. 711 * 712 * @param cls closure a `struct Inquiry *` 713 * @param tgr response details 714 */ 715 static void 716 wire_transfer_cb (void *cls, 717 const struct TALER_EXCHANGE_GetTransfersResponse *tgr) 718 { 719 struct Inquiry *w = cls; 720 struct Exchange *e = w->exchange; 721 const struct TALER_EXCHANGE_TransferData *td = NULL; 722 723 e->exchange_inquiries--; 724 w->wdh = NULL; 725 if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) 726 launch_inquiries_at_exchange (e); 727 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 728 "Got response code %u from exchange for GET /transfers/$WTID\n", 729 tgr->hr.http_status); 730 switch (tgr->hr.http_status) 731 { 732 case MHD_HTTP_OK: 733 td = &tgr->details.ok.td; 734 w->execution_time = td->execution_time; 735 e->transfer_delay = GNUNET_TIME_UNIT_ZERO; 736 break; 737 case MHD_HTTP_BAD_REQUEST: 738 case MHD_HTTP_FORBIDDEN: 739 case MHD_HTTP_NOT_FOUND: 740 found_problem = true; 741 update_transaction_status (w, 742 GNUNET_TIME_UNIT_FOREVER_ABS, 743 tgr->hr.http_status, 744 tgr->hr.ec, 745 tgr->hr.hint, 746 false); 747 end_inquiry (w); 748 return; 749 case MHD_HTTP_INTERNAL_SERVER_ERROR: 750 case MHD_HTTP_BAD_GATEWAY: 751 case MHD_HTTP_GATEWAY_TIMEOUT: 752 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 753 update_transaction_status (w, 754 GNUNET_TIME_relative_to_absolute ( 755 e->transfer_delay), 756 tgr->hr.http_status, 757 tgr->hr.ec, 758 tgr->hr.hint, 759 true); 760 end_inquiry (w); 761 return; 762 default: 763 found_problem = true; 764 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 765 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 766 "Unexpected HTTP status %u\n", 767 tgr->hr.http_status); 768 update_transaction_status (w, 769 GNUNET_TIME_relative_to_absolute ( 770 e->transfer_delay), 771 tgr->hr.http_status, 772 tgr->hr.ec, 773 tgr->hr.hint, 774 true); 775 end_inquiry (w); 776 return; 777 } 778 TALER_MERCHANTDB_preflight (pg); 779 780 { 781 enum GNUNET_DB_QueryStatus qs; 782 783 qs = TALER_MERCHANTDB_insert_transfer_details (pg, 784 w->instance_id, 785 w->exchange->exchange_url, 786 w->payto_uri, 787 &w->wtid, 788 td); 789 if (0 > qs) 790 { 791 /* Always report on DB error as well to enable diagnostics */ 792 GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); 793 global_ret = EXIT_FAILURE; 794 GNUNET_SCHEDULER_shutdown (); 795 return; 796 } 797 // FIXME: insert_transfer_details has more complex 798 // error possibilities inside, expose them here 799 // and persist them with the transaction status 800 // if they arise (especially no_account, no_exchange, conflict) 801 // -- not sure how no_instance could happen... 802 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 803 { 804 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 805 "Transfer already known. Ignoring duplicate.\n"); 806 return; 807 } 808 } 809 810 { 811 struct CheckTransferContext ctc = { 812 .ec = TALER_EC_NONE, 813 .failure = false 814 }; 815 816 for (unsigned int i = 0; i<td->details_length; i++) 817 { 818 const struct TALER_TrackTransferDetails *ttd = &td->details[i]; 819 enum GNUNET_DB_QueryStatus qs; 820 821 if (TALER_EC_NONE != ctc.ec) 822 break; /* already encountered an error */ 823 ctc.current_offset = i; 824 ctc.current_detail = ttd; 825 /* Set the coin as "never seen" before. */ 826 ctc.check_transfer_result = GNUNET_NO; 827 qs = TALER_MERCHANTDB_lookup_deposits_by_contract_and_coin ( 828 pg, 829 w->instance_id, 830 &ttd->h_contract_terms, 831 &ttd->coin_pub, 832 &check_transfer, 833 &ctc); 834 switch (qs) 835 { 836 case GNUNET_DB_STATUS_SOFT_ERROR: 837 GNUNET_break (0); 838 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 839 break; 840 case GNUNET_DB_STATUS_HARD_ERROR: 841 GNUNET_break (0); 842 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 843 break; 844 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 845 /* The exchange says we made this deposit, but WE do not 846 recall making it (corrupted / unreliable database?)! 847 Well, let's say thanks and accept the money! */ 848 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 849 "Failed to find payment data in DB\n"); 850 ctc.check_transfer_result = GNUNET_OK; 851 break; 852 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 853 break; 854 } 855 switch (ctc.check_transfer_result) 856 { 857 case GNUNET_NO: 858 /* Internal error: how can we have called #check_transfer() 859 but still have no result? */ 860 GNUNET_break (0); 861 ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE; 862 return; 863 case GNUNET_SYSERR: 864 /* #check_transfer() failed, report conflict! */ 865 GNUNET_break_op (0); 866 GNUNET_assert (TALER_EC_NONE != ctc.ec); 867 break; 868 case GNUNET_OK: 869 break; 870 } 871 } 872 if (TALER_EC_NONE != ctc.ec) 873 { 874 update_transaction_status ( 875 w, 876 ctc.failure 877 ? GNUNET_TIME_UNIT_FOREVER_ABS 878 : GNUNET_TIME_relative_to_absolute ( 879 GNUNET_TIME_UNIT_MINUTES), 880 MHD_HTTP_OK, 881 ctc.ec, 882 NULL /* no hint */, 883 ! ctc.failure); 884 end_inquiry (w); 885 return; 886 } 887 } 888 889 if (GNUNET_SYSERR == 890 check_wire_fee (w, 891 td->execution_time, 892 &td->wire_fee)) 893 { 894 GNUNET_break_op (0); 895 update_transaction_status (w, 896 GNUNET_TIME_UNIT_FOREVER_ABS, 897 MHD_HTTP_OK, 898 TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE, 899 TALER_amount2s (&td->wire_fee), 900 false); 901 end_inquiry (w); 902 return; 903 } 904 905 { 906 enum GNUNET_DB_QueryStatus qs; 907 908 qs = TALER_MERCHANTDB_finalize_transfer_status (pg, 909 w->exchange->exchange_url, 910 &w->wtid, 911 &td->h_details, 912 &td->total_amount, 913 &td->wire_fee, 914 &td->exchange_pub, 915 &td->exchange_sig); 916 if (qs < 0) 917 { 918 GNUNET_break (0); 919 global_ret = EXIT_FAILURE; 920 GNUNET_SCHEDULER_shutdown (); 921 return; 922 } 923 } 924 end_inquiry (w); 925 } 926 927 928 /** 929 * Initiate download from an exchange for a given inquiry. 930 * 931 * @param cls a `struct Inquiry *` 932 */ 933 static void 934 exchange_request (void *cls) 935 { 936 struct Inquiry *w = cls; 937 struct Exchange *e = w->exchange; 938 939 w->task = NULL; 940 if (NULL == e->keys) 941 return; 942 w->wdh = TALER_EXCHANGE_get_transfers_create ( 943 ctx, 944 e->exchange_url, 945 e->keys, 946 &w->wtid); 947 if (NULL == w->wdh) 948 { 949 GNUNET_break (0); 950 e->exchange_inquiries--; 951 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 952 update_transaction_status (w, 953 GNUNET_TIME_relative_to_absolute ( 954 e->transfer_delay), 955 0 /* failed to begin */, 956 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, 957 "Failed to initiate GET request at exchange", 958 true); 959 end_inquiry (w); 960 return; 961 } 962 GNUNET_assert (TALER_EC_NONE == 963 TALER_EXCHANGE_get_transfers_start (w->wdh, 964 &wire_transfer_cb, 965 w)); 966 967 /* Wait at least 1m for the network transfer */ 968 update_transaction_status (w, 969 GNUNET_TIME_relative_to_absolute ( 970 GNUNET_TIME_UNIT_MINUTES), 971 0 /* timeout */, 972 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST, 973 "Initiated GET with exchange", 974 true); 975 } 976 977 978 /** 979 * Function called with information about a transfer we 980 * should ask the exchange about. 981 * 982 * @param cls closure (NULL) 983 * @param rowid row of the transfer in the merchant database 984 * @param instance_id instance that received the transfer 985 * @param exchange_url base URL of the exchange that initiated the transfer 986 * @param payto_uri account of the merchant that received the transfer 987 * @param wtid wire transfer subject identifying the aggregation 988 * @param next_attempt when should we next try to interact with the exchange 989 */ 990 static void 991 start_inquiry ( 992 void *cls, 993 uint64_t rowid, 994 const char *instance_id, 995 const char *exchange_url, 996 struct TALER_FullPayto payto_uri, 997 const struct TALER_WireTransferIdentifierRawP *wtid, 998 struct GNUNET_TIME_Absolute next_attempt) 999 { 1000 struct Exchange *e; 1001 struct Inquiry *w; 1002 1003 (void) cls; 1004 if (GNUNET_TIME_absolute_is_future (next_attempt)) 1005 { 1006 if (NULL == task) 1007 task = GNUNET_SCHEDULER_add_at (next_attempt, 1008 &find_work, 1009 NULL); 1010 return; 1011 } 1012 active_inquiries++; 1013 1014 e = find_exchange (exchange_url); 1015 for (w = e->w_head; NULL != w; w = w->next) 1016 { 1017 if (0 == GNUNET_memcmp (&w->wtid, 1018 wtid)) 1019 { 1020 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1021 "Already processing inquiry. Aborting ongoing inquiry\n"); 1022 end_inquiry (w); 1023 break; 1024 } 1025 } 1026 1027 w = GNUNET_new (struct Inquiry); 1028 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 1029 w->instance_id = GNUNET_strdup (instance_id); 1030 w->rowid = rowid; 1031 w->wtid = *wtid; 1032 GNUNET_CONTAINER_DLL_insert (e->w_head, 1033 e->w_tail, 1034 w); 1035 w->exchange = e; 1036 if (NULL != w->exchange->keys) 1037 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 1038 w); 1039 /* Wait at least 1 minute for /keys */ 1040 update_transaction_status (w, 1041 GNUNET_TIME_relative_to_absolute ( 1042 GNUNET_TIME_UNIT_MINUTES), 1043 0 /* timeout */, 1044 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS, 1045 exchange_url, 1046 true); 1047 } 1048 1049 1050 static void 1051 find_work (void *cls) 1052 { 1053 enum GNUNET_DB_QueryStatus qs; 1054 int limit; 1055 1056 (void) cls; 1057 task = NULL; 1058 GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries); 1059 limit = OPEN_INQUIRY_LIMIT - active_inquiries; 1060 if (0 == limit) 1061 { 1062 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1063 "Not looking for work: at limit\n"); 1064 at_limit = true; 1065 return; 1066 } 1067 at_limit = false; 1068 qs = TALER_MERCHANTDB_select_open_transfers (pg, 1069 limit, 1070 &start_inquiry, 1071 NULL); 1072 if (qs < 0) 1073 { 1074 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1075 "Failed to obtain open transfers from database\n"); 1076 GNUNET_SCHEDULER_shutdown (); 1077 return; 1078 } 1079 if (qs >= limit) 1080 { 1081 /* DB limited response, re-trigger DB interaction 1082 the moment we significantly fall below the 1083 limit */ 1084 at_limit = true; 1085 } 1086 if (0 == active_inquiries) 1087 { 1088 if (test_mode) 1089 { 1090 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1091 "No more open inquiries and in test mode. Existing.\n"); 1092 GNUNET_SCHEDULER_shutdown (); 1093 return; 1094 } 1095 GNUNET_log ( 1096 GNUNET_ERROR_TYPE_INFO, 1097 "No open inquiries found, waiting for notification to resume\n"); 1098 } 1099 } 1100 1101 1102 /** 1103 * Function called when transfers are added to the merchant database. We look 1104 * for more work. 1105 * 1106 * @param cls closure (NULL) 1107 * @param extra additional event data provided 1108 * @param extra_size number of bytes in @a extra 1109 */ 1110 static void 1111 transfer_added (void *cls, 1112 const void *extra, 1113 size_t extra_size) 1114 { 1115 (void) cls; 1116 (void) extra; 1117 (void) extra_size; 1118 if (active_inquiries > OPEN_INQUIRY_LIMIT / 2) 1119 { 1120 /* Trigger DB only once we are substantially below the limit */ 1121 at_limit = true; 1122 return; 1123 } 1124 if (NULL != task) 1125 return; 1126 task = GNUNET_SCHEDULER_add_now (&find_work, 1127 NULL); 1128 } 1129 1130 1131 /** 1132 * Function called when keys were changed in the 1133 * merchant database. Updates ours. 1134 * 1135 * @param cls closure (NULL) 1136 * @param extra additional event data provided 1137 * @param extra_size number of bytes in @a extra 1138 */ 1139 static void 1140 keys_changed (void *cls, 1141 const void *extra, 1142 size_t extra_size) 1143 { 1144 const char *url = extra; 1145 struct Exchange *e; 1146 1147 (void) cls; 1148 if ( (NULL == extra) || 1149 (0 == extra_size) ) 1150 { 1151 GNUNET_break (0); 1152 return; 1153 } 1154 if ('\0' != url[extra_size - 1]) 1155 { 1156 GNUNET_break (0); 1157 return; 1158 } 1159 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1160 "Received keys change notification: reload `%s'\n", 1161 url); 1162 e = find_exchange (url); 1163 sync_keys (e); 1164 } 1165 1166 1167 /** 1168 * First task. 1169 * 1170 * @param cls closure, NULL 1171 * @param args remaining command-line arguments 1172 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 1173 * @param c configuration 1174 */ 1175 static void 1176 run (void *cls, 1177 char *const *args, 1178 const char *cfgfile, 1179 const struct GNUNET_CONFIGURATION_Handle *c) 1180 { 1181 (void) args; 1182 (void) cfgfile; 1183 1184 cfg = c; 1185 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1186 NULL); 1187 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 1188 &rc); 1189 rc = GNUNET_CURL_gnunet_rc_create (ctx); 1190 if (NULL == ctx) 1191 { 1192 GNUNET_break (0); 1193 GNUNET_SCHEDULER_shutdown (); 1194 global_ret = EXIT_FAILURE; 1195 return; 1196 } 1197 if (NULL == 1198 (pg = TALER_MERCHANTDB_connect (cfg))) 1199 { 1200 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1201 "Failed to initialize DB subsystem. Consider running taler-merchant-dbconfig!\n"); 1202 GNUNET_SCHEDULER_shutdown (); 1203 global_ret = EXIT_FAILURE; 1204 return; 1205 } 1206 { 1207 struct GNUNET_DB_EventHeaderP es = { 1208 .size = htons (sizeof (es)), 1209 .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_EXPECTED) 1210 }; 1211 1212 eh = TALER_MERCHANTDB_event_listen (pg, 1213 &es, 1214 GNUNET_TIME_UNIT_FOREVER_REL, 1215 &transfer_added, 1216 NULL); 1217 } 1218 { 1219 struct GNUNET_DB_EventHeaderP es = { 1220 .size = htons (sizeof (es)), 1221 .type = htons (TALER_DBEVENT_MERCHANT_EXCHANGE_KEYS) 1222 }; 1223 1224 eh_keys 1225 = TALER_MERCHANTDB_event_listen (pg, 1226 &es, 1227 GNUNET_TIME_UNIT_FOREVER_REL, 1228 &keys_changed, 1229 NULL); 1230 } 1231 1232 GNUNET_assert (NULL == task); 1233 task = GNUNET_SCHEDULER_add_now (&find_work, 1234 NULL); 1235 } 1236 1237 1238 /** 1239 * The main function of taler-merchant-reconciliation 1240 * 1241 * @param argc number of arguments from the command line 1242 * @param argv command line arguments 1243 * @return 0 ok, 1 on error 1244 */ 1245 int 1246 main (int argc, 1247 char *const *argv) 1248 { 1249 struct GNUNET_GETOPT_CommandLineOption options[] = { 1250 GNUNET_GETOPT_option_timetravel ('T', 1251 "timetravel"), 1252 GNUNET_GETOPT_option_flag ('t', 1253 "test", 1254 "run in test mode and exit when idle", 1255 &test_mode), 1256 GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), 1257 GNUNET_GETOPT_OPTION_END 1258 }; 1259 enum GNUNET_GenericReturnValue ret; 1260 1261 ret = GNUNET_PROGRAM_run ( 1262 TALER_MERCHANT_project_data (), 1263 argc, argv, 1264 "taler-merchant-reconciliation", 1265 gettext_noop ( 1266 "background process that reconciles bank transfers with orders by asking the exchange"), 1267 options, 1268 &run, NULL); 1269 if (GNUNET_SYSERR == ret) 1270 return EXIT_INVALIDARGUMENT; 1271 if (GNUNET_NO == ret) 1272 return EXIT_SUCCESS; 1273 if ( (found_problem) && 1274 (0 == global_ret) ) 1275 global_ret = 7; 1276 return global_ret; 1277 } 1278 1279 1280 /* end of taler-merchant-reconciliation.c */