taler-merchant-reconciliation.c (36049B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023-2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file src/backend/taler-merchant-reconciliation.c 18 * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange 19 * @author Christian Grothoff 20 */ 21 #include "platform.h" 22 struct Inquiry; 23 #define TALER_EXCHANGE_GET_TRANSFERS_RESULT_CLOSURE struct Inquiry 24 #include "microhttpd.h" 25 #include <gnunet/gnunet_util_lib.h> 26 #include <jansson.h> 27 #include <pthread.h> 28 #include <taler/taler_dbevents.h> 29 #include "taler/taler_merchant_util.h" 30 #include "taler/taler_merchant_bank_lib.h" 31 #include "merchantdb_lib.h" 32 #include "merchantdb_lib.h" 33 #include "merchant-database/finalize_transfer_status.h" 34 #include "merchant-database/insert_transfer_details.h" 35 #include "merchant-database/lookup_deposits_by_contract_and_coin.h" 36 #include "merchant-database/lookup_wire_fee.h" 37 #include "merchant-database/select_exchange_keys.h" 38 #include "merchant-database/select_open_transfers.h" 39 #include "merchant-database/update_transfer_status.h" 40 #include "merchant-database/event_listen.h" 41 #include "merchant-database/preflight.h" 42 43 /** 44 * Timeout for the exchange interaction. Rather long as we should do 45 * long-polling and do not want to wake up too often. 46 */ 47 #define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \ 48 GNUNET_TIME_UNIT_MINUTES, \ 49 30) 50 51 /** 52 * How many inquiries do we process concurrently at most. 53 */ 54 #define OPEN_INQUIRY_LIMIT 1024 55 56 /** 57 * How many inquiries do we process concurrently per exchange at most. 58 */ 59 #define EXCHANGE_INQUIRY_LIMIT 16 60 61 62 /** 63 * Information about an inquiry job. 64 */ 65 struct Inquiry; 66 67 68 /** 69 * Information about an exchange. 70 */ 71 struct Exchange 72 { 73 /** 74 * Kept in a DLL. 75 */ 76 struct Exchange *next; 77 78 /** 79 * Kept in a DLL. 80 */ 81 struct Exchange *prev; 82 83 /** 84 * Head of active inquiries. 85 */ 86 struct Inquiry *w_head; 87 88 /** 89 * Tail of active inquiries. 90 */ 91 struct Inquiry *w_tail; 92 93 /** 94 * Which exchange are we tracking here. 95 */ 96 char *exchange_url; 97 98 /** 99 * The keys of this exchange 100 */ 101 struct TALER_EXCHANGE_Keys *keys; 102 103 /** 104 * How many active inquiries do we have right now with this exchange. 105 */ 106 unsigned int exchange_inquiries; 107 108 /** 109 * How long should we wait between requests 110 * for transfer details? 111 */ 112 struct GNUNET_TIME_Relative transfer_delay; 113 114 }; 115 116 117 /** 118 * Information about an inquiry job. 119 */ 120 struct Inquiry 121 { 122 /** 123 * Kept in a DLL. 124 */ 125 struct Inquiry *next; 126 127 /** 128 * Kept in a DLL. 129 */ 130 struct Inquiry *prev; 131 132 /** 133 * Handle to the exchange that made the transfer. 134 */ 135 struct Exchange *exchange; 136 137 /** 138 * Task where we retry fetching transfer details from the exchange. 139 */ 140 struct GNUNET_SCHEDULER_Task *task; 141 142 /** 143 * For which merchant instance is this tracking request? 144 */ 145 char *instance_id; 146 147 /** 148 * payto:// URI used for the transfer. 149 */ 150 struct TALER_FullPayto payto_uri; 151 152 /** 153 * Handle for the GET /transfers request. 154 */ 155 struct TALER_EXCHANGE_GetTransfersHandle *wdh; 156 157 /** 158 * When did the transfer happen? 159 */ 160 struct GNUNET_TIME_Timestamp execution_time; 161 162 /** 163 * Argument for the /wire/transfers request. 164 */ 165 struct TALER_WireTransferIdentifierRawP wtid; 166 167 /** 168 * Row of the wire transfer in our database. 169 */ 170 uint64_t rowid; 171 172 }; 173 174 175 /** 176 * Head of known exchanges. 177 */ 178 static struct Exchange *e_head; 179 180 /** 181 * Tail of known exchanges. 182 */ 183 static struct Exchange *e_tail; 184 185 /** 186 * The merchant's configuration. 187 */ 188 static const struct GNUNET_CONFIGURATION_Handle *cfg; 189 190 /** 191 * Our database connection. 192 */ 193 static struct TALER_MERCHANTDB_PostgresContext *pg; 194 195 /** 196 * Handle to the context for interacting with the bank. 197 */ 198 static struct GNUNET_CURL_Context *ctx; 199 200 /** 201 * Scheduler context for running the @e ctx. 202 */ 203 static struct GNUNET_CURL_RescheduleContext *rc; 204 205 /** 206 * Main task for #find_work(). 207 */ 208 static struct GNUNET_SCHEDULER_Task *task; 209 210 /** 211 * Event handler to learn that there are new transfers 212 * to check. 213 */ 214 static struct GNUNET_DB_EventHandler *eh; 215 216 /** 217 * Event handler to learn that there may be new exchange 218 * keys to check. 219 */ 220 static struct GNUNET_DB_EventHandler *eh_keys; 221 222 /** 223 * How many active inquiries do we have right now. 224 */ 225 static unsigned int active_inquiries; 226 227 /** 228 * Set to true if we ever encountered any problem. 229 */ 230 static bool found_problem; 231 232 /** 233 * Value to return from main(). 0 on success, non-zero on errors. 234 */ 235 static int global_ret; 236 237 /** 238 * #GNUNET_YES if we are in test mode and should exit when idle. 239 */ 240 static int test_mode; 241 242 /** 243 * True if the last DB query was limited by the 244 * #OPEN_INQUIRY_LIMIT and we thus should check again 245 * as soon as we are substantially below that limit, 246 * and not only when we get a DB notification. 247 */ 248 static bool at_limit; 249 250 251 /** 252 * Initiate download from exchange. 253 * 254 * @param cls a `struct Inquiry *` 255 */ 256 static void 257 exchange_request (void *cls); 258 259 260 /** 261 * The exchange @a e is ready to handle more inquiries, 262 * prepare to launch them. 263 * 264 * @param[in,out] e exchange to potentially launch inquiries on 265 */ 266 static void 267 launch_inquiries_at_exchange (struct Exchange *e) 268 { 269 for (struct Inquiry *w = e->w_head; 270 NULL != w; 271 w = w->next) 272 { 273 if (e->exchange_inquiries >= EXCHANGE_INQUIRY_LIMIT) 274 break; 275 if ( (NULL == w->task) && 276 (NULL == w->wdh) ) 277 { 278 e->exchange_inquiries++; 279 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 280 w); 281 } 282 } 283 } 284 285 286 /** 287 * Updates the transaction status for inquiry @a w to the given values. 288 * 289 * @param w inquiry to update status for 290 * @param next_attempt when should we retry @a w (if ever) 291 * @param http_status HTTP status of the response 292 * @param ec error code to use (if any) 293 * @param last_hint hint delivered with the response (if any, possibly NULL) 294 * @param needs_retry true if we should try the HTTP request again 295 */ 296 static void 297 update_transaction_status (const struct Inquiry *w, 298 struct GNUNET_TIME_Absolute next_attempt, 299 unsigned int http_status, 300 enum TALER_ErrorCode ec, 301 const char *last_hint, 302 bool needs_retry) 303 { 304 enum GNUNET_DB_QueryStatus qs; 305 306 qs = TALER_MERCHANTDB_update_transfer_status (pg, 307 w->exchange->exchange_url, 308 &w->wtid, 309 next_attempt, 310 http_status, 311 ec, 312 last_hint, 313 needs_retry); 314 if (qs < 0) 315 { 316 GNUNET_break (0); 317 global_ret = EXIT_FAILURE; 318 GNUNET_SCHEDULER_shutdown (); 319 return; 320 } 321 } 322 323 324 /** 325 * Interact with the database to get the current set 326 * of exchange keys known to us. 327 * 328 * @param e the exchange to check 329 */ 330 static void 331 sync_keys (struct Exchange *e) 332 { 333 enum GNUNET_DB_QueryStatus qs; 334 struct TALER_EXCHANGE_Keys *keys; 335 struct GNUNET_TIME_Absolute first_retry; 336 337 qs = TALER_MERCHANTDB_select_exchange_keys (pg, 338 e->exchange_url, 339 &first_retry, 340 &keys); 341 if (qs < 0) 342 { 343 GNUNET_break (0); 344 return; 345 } 346 if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) || 347 (NULL == keys) ) 348 { 349 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 350 "Cannot launch inquiries at `%s': lacking /keys response\n", 351 e->exchange_url); 352 return; 353 } 354 TALER_EXCHANGE_keys_decref (e->keys); 355 e->keys = keys; 356 launch_inquiries_at_exchange (e); 357 } 358 359 360 /** 361 * Lookup our internal data structure for the given 362 * @a exchange_url or create one if we do not yet have 363 * one. 364 * 365 * @param exchange_url base URL of the exchange 366 * @return our state for this exchange 367 */ 368 static struct Exchange * 369 find_exchange (const char *exchange_url) 370 { 371 struct Exchange *e; 372 373 for (e = e_head; NULL != e; e = e->next) 374 if (0 == strcmp (exchange_url, 375 e->exchange_url)) 376 return e; 377 e = GNUNET_new (struct Exchange); 378 e->exchange_url = GNUNET_strdup (exchange_url); 379 GNUNET_CONTAINER_DLL_insert (e_head, 380 e_tail, 381 e); 382 sync_keys (e); 383 return e; 384 } 385 386 387 /** 388 * Finds new transfers that require work in the merchant database. 389 * 390 * @param cls NULL 391 */ 392 static void 393 find_work (void *cls); 394 395 396 /** 397 * Free resources of @a w. 398 * 399 * @param[in] w inquiry job to terminate 400 */ 401 static void 402 end_inquiry (struct Inquiry *w) 403 { 404 struct Exchange *e = w->exchange; 405 406 GNUNET_assert (active_inquiries > 0); 407 active_inquiries--; 408 if (NULL != w->wdh) 409 { 410 TALER_EXCHANGE_get_transfers_cancel (w->wdh); 411 w->wdh = NULL; 412 } 413 GNUNET_free (w->instance_id); 414 GNUNET_free (w->payto_uri.full_payto); 415 GNUNET_CONTAINER_DLL_remove (e->w_head, 416 e->w_tail, 417 w); 418 GNUNET_free (w); 419 if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) && 420 (NULL == task) && 421 (at_limit) ) 422 { 423 at_limit = false; 424 GNUNET_assert (NULL == task); 425 task = GNUNET_SCHEDULER_add_now (&find_work, 426 NULL); 427 } 428 if ( (NULL == task) && 429 (! at_limit) && 430 (0 == active_inquiries) && 431 (test_mode) ) 432 { 433 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 434 "No more open inquiries and in test mode. Exiting.\n"); 435 GNUNET_SCHEDULER_shutdown (); 436 return; 437 } 438 } 439 440 441 /** 442 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 443 * 444 * @param cls closure (NULL) 445 */ 446 static void 447 shutdown_task (void *cls) 448 { 449 (void) cls; 450 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 451 "Running shutdown\n"); 452 while (NULL != e_head) 453 { 454 struct Exchange *e = e_head; 455 456 while (NULL != e->w_head) 457 { 458 struct Inquiry *w = e->w_head; 459 460 end_inquiry (w); 461 } 462 GNUNET_free (e->exchange_url); 463 if (NULL != e->keys) 464 { 465 TALER_EXCHANGE_keys_decref (e->keys); 466 e->keys = NULL; 467 } 468 GNUNET_CONTAINER_DLL_remove (e_head, 469 e_tail, 470 e); 471 GNUNET_free (e); 472 } 473 if (NULL != eh) 474 { 475 TALER_MERCHANTDB_event_listen_cancel (eh); 476 eh = NULL; 477 } 478 if (NULL != eh_keys) 479 { 480 TALER_MERCHANTDB_event_listen_cancel (eh_keys); 481 eh_keys = NULL; 482 } 483 if (NULL != task) 484 { 485 GNUNET_SCHEDULER_cancel (task); 486 task = NULL; 487 } 488 if (NULL != pg) 489 { 490 TALER_MERCHANTDB_disconnect (pg); 491 pg = NULL; 492 } 493 cfg = NULL; 494 if (NULL != ctx) 495 { 496 GNUNET_CURL_fini (ctx); 497 ctx = NULL; 498 } 499 if (NULL != rc) 500 { 501 GNUNET_CURL_gnunet_rc_destroy (rc); 502 rc = NULL; 503 } 504 } 505 506 507 /** 508 * Check that the given @a wire_fee is what the @a e should charge 509 * at the @a execution_time. If the fee is correct (according to our 510 * database), return #GNUNET_OK. If we do not have the fee structure in our 511 * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee 512 * is bogus, we respond with the proof to the client and return 513 * #GNUNET_SYSERR. 514 * 515 * @param w inquiry to check fees of 516 * @param execution_time time of the wire transfer 517 * @param wire_fee fee claimed by the exchange 518 * @return #GNUNET_SYSERR if we returned hard proof of 519 * missbehavior from the exchange to the client 520 */ 521 static enum GNUNET_GenericReturnValue 522 check_wire_fee (struct Inquiry *w, 523 struct GNUNET_TIME_Timestamp execution_time, 524 const struct TALER_Amount *wire_fee) 525 { 526 struct Exchange *e = w->exchange; 527 const struct TALER_EXCHANGE_Keys *keys = e->keys; 528 struct TALER_WireFeeSet fees; 529 struct TALER_MasterSignatureP master_sig; 530 struct GNUNET_TIME_Timestamp start_date; 531 struct GNUNET_TIME_Timestamp end_date; 532 enum GNUNET_DB_QueryStatus qs; 533 char *wire_method; 534 535 if (NULL == keys) 536 { 537 GNUNET_break (0); 538 return GNUNET_NO; 539 } 540 wire_method = TALER_payto_get_method (w->payto_uri.full_payto); 541 qs = TALER_MERCHANTDB_lookup_wire_fee (pg, 542 &keys->master_pub, 543 wire_method, 544 execution_time, 545 &fees, 546 &start_date, 547 &end_date, 548 &master_sig); 549 switch (qs) 550 { 551 case GNUNET_DB_STATUS_HARD_ERROR: 552 GNUNET_break (0); 553 GNUNET_free (wire_method); 554 return GNUNET_SYSERR; 555 case GNUNET_DB_STATUS_SOFT_ERROR: 556 GNUNET_free (wire_method); 557 return GNUNET_NO; 558 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 559 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 560 "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n", 561 TALER_B2S (&keys->master_pub), 562 wire_method, 563 GNUNET_TIME_timestamp2s (execution_time), 564 TALER_amount2s (wire_fee)); 565 GNUNET_free (wire_method); 566 return GNUNET_OK; 567 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 568 break; 569 } 570 if ( (GNUNET_OK != 571 TALER_amount_cmp_currency (&fees.wire, 572 wire_fee)) || 573 (0 > TALER_amount_cmp (&fees.wire, 574 wire_fee)) ) 575 { 576 GNUNET_break_op (0); 577 GNUNET_free (wire_method); 578 return GNUNET_SYSERR; /* expected_fee >= wire_fee */ 579 } 580 GNUNET_free (wire_method); 581 return GNUNET_OK; 582 } 583 584 585 /** 586 * Closure for #check_transfer() 587 */ 588 struct CheckTransferContext 589 { 590 591 /** 592 * Pointer to the detail that we are currently 593 * checking in #check_transfer(). 594 */ 595 const struct TALER_TrackTransferDetails *current_detail; 596 597 /** 598 * Which transaction detail are we currently looking at? 599 */ 600 unsigned int current_offset; 601 602 /** 603 * #GNUNET_NO if we did not find a matching coin. 604 * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. 605 * #GNUNET_OK if we did find a matching coin. 606 */ 607 enum GNUNET_GenericReturnValue check_transfer_result; 608 609 /** 610 * Set to error code, if any. 611 */ 612 enum TALER_ErrorCode ec; 613 614 /** 615 * Set to true if @e ec indicates a permanent failure. 616 */ 617 bool failure; 618 }; 619 620 621 /** 622 * This function checks that the information about the coin which 623 * was paid back by _this_ wire transfer matches what _we_ (the merchant) 624 * knew about this coin. 625 * 626 * @param cls closure with our `struct CheckTransferContext *` 627 * @param exchange_url URL of the exchange that issued @a coin_pub 628 * @param amount_with_fee amount the exchange will transfer for this coin 629 * @param deposit_fee fee the exchange will charge for this coin 630 * @param refund_fee fee the exchange will charge for refunding this coin 631 * @param wire_fee paid wire fee 632 * @param h_wire hash of merchant's wire details 633 * @param deposit_timestamp when did the exchange receive the deposit 634 * @param refund_deadline until when are refunds allowed 635 * @param exchange_sig signature by the exchange 636 * @param exchange_pub exchange signing key used for @a exchange_sig 637 */ 638 static void 639 check_transfer (void *cls, 640 const char *exchange_url, 641 const struct TALER_Amount *amount_with_fee, 642 const struct TALER_Amount *deposit_fee, 643 const struct TALER_Amount *refund_fee, 644 const struct TALER_Amount *wire_fee, 645 const struct TALER_MerchantWireHashP *h_wire, 646 struct GNUNET_TIME_Timestamp deposit_timestamp, 647 struct GNUNET_TIME_Timestamp refund_deadline, 648 const struct TALER_ExchangeSignatureP *exchange_sig, 649 const struct TALER_ExchangePublicKeyP *exchange_pub) 650 { 651 struct CheckTransferContext *ctc = cls; 652 const struct TALER_TrackTransferDetails *ttd = ctc->current_detail; 653 654 if (GNUNET_SYSERR == ctc->check_transfer_result) 655 { 656 GNUNET_break (0); 657 return; /* already had a serious issue; odd that we're called more than once as well... */ 658 } 659 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 660 "Checking coin with value %s\n", 661 TALER_amount2s (amount_with_fee)); 662 if ( (GNUNET_OK != 663 TALER_amount_cmp_currency (amount_with_fee, 664 &ttd->coin_value)) || 665 (0 != TALER_amount_cmp (amount_with_fee, 666 &ttd->coin_value)) ) 667 { 668 /* Disagreement between the exchange and us about how much this 669 coin is worth! */ 670 GNUNET_break_op (0); 671 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 672 "Disagreement about coin value %s\n", 673 TALER_amount2s (amount_with_fee)); 674 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 675 "Exchange gave it a value of %s\n", 676 TALER_amount2s (&ttd->coin_value)); 677 ctc->check_transfer_result = GNUNET_SYSERR; 678 /* Build the `TrackTransferConflictDetails` */ 679 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 680 ctc->failure = true; 681 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 682 return; 683 } 684 if ( (GNUNET_OK != 685 TALER_amount_cmp_currency (deposit_fee, 686 &ttd->coin_fee)) || 687 (0 != TALER_amount_cmp (deposit_fee, 688 &ttd->coin_fee)) ) 689 { 690 /* Disagreement between the exchange and us about how much this 691 coin is worth! */ 692 GNUNET_break_op (0); 693 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 694 "Expected fee is %s\n", 695 TALER_amount2s (&ttd->coin_fee)); 696 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 697 "Fee claimed by exchange is %s\n", 698 TALER_amount2s (deposit_fee)); 699 ctc->check_transfer_result = GNUNET_SYSERR; 700 /* Build the `TrackTransferConflictDetails` */ 701 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 702 ctc->failure = true; 703 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 704 return; 705 } 706 ctc->check_transfer_result = GNUNET_OK; 707 } 708 709 710 /** 711 * Function called with detailed wire transfer data, including all 712 * of the coin transactions that were combined into the wire transfer. 713 * 714 * @param cls closure a `struct Inquiry *` 715 * @param tgr response details 716 */ 717 static void 718 wire_transfer_cb (struct Inquiry *w, 719 const struct TALER_EXCHANGE_GetTransfersResponse *tgr) 720 { 721 struct Exchange *e = w->exchange; 722 const struct TALER_EXCHANGE_TransferData *td = NULL; 723 724 e->exchange_inquiries--; 725 w->wdh = NULL; 726 if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) 727 launch_inquiries_at_exchange (e); 728 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 729 "Got response code %u from exchange for GET /transfers/$WTID\n", 730 tgr->hr.http_status); 731 switch (tgr->hr.http_status) 732 { 733 case MHD_HTTP_OK: 734 td = &tgr->details.ok.td; 735 w->execution_time = td->execution_time; 736 e->transfer_delay = GNUNET_TIME_UNIT_ZERO; 737 break; 738 case MHD_HTTP_BAD_REQUEST: 739 case MHD_HTTP_FORBIDDEN: 740 case MHD_HTTP_NOT_FOUND: 741 found_problem = true; 742 update_transaction_status (w, 743 GNUNET_TIME_UNIT_FOREVER_ABS, 744 tgr->hr.http_status, 745 tgr->hr.ec, 746 tgr->hr.hint, 747 false); 748 end_inquiry (w); 749 return; 750 case MHD_HTTP_INTERNAL_SERVER_ERROR: 751 case MHD_HTTP_BAD_GATEWAY: 752 case MHD_HTTP_GATEWAY_TIMEOUT: 753 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 754 update_transaction_status (w, 755 GNUNET_TIME_relative_to_absolute ( 756 e->transfer_delay), 757 tgr->hr.http_status, 758 tgr->hr.ec, 759 tgr->hr.hint, 760 true); 761 end_inquiry (w); 762 return; 763 default: 764 found_problem = true; 765 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 766 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 767 "Unexpected HTTP status %u\n", 768 tgr->hr.http_status); 769 update_transaction_status (w, 770 GNUNET_TIME_relative_to_absolute ( 771 e->transfer_delay), 772 tgr->hr.http_status, 773 tgr->hr.ec, 774 tgr->hr.hint, 775 true); 776 end_inquiry (w); 777 return; 778 } 779 TALER_MERCHANTDB_preflight (pg); 780 781 { 782 enum GNUNET_DB_QueryStatus qs; 783 784 qs = TALER_MERCHANTDB_insert_transfer_details (pg, 785 w->instance_id, 786 w->exchange->exchange_url, 787 w->payto_uri, 788 &w->wtid, 789 td); 790 if (0 > qs) 791 { 792 /* Always report on DB error as well to enable diagnostics */ 793 GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); 794 global_ret = EXIT_FAILURE; 795 GNUNET_SCHEDULER_shutdown (); 796 return; 797 } 798 // FIXME: insert_transfer_details has more complex 799 // error possibilities inside, expose them here 800 // and persist them with the transaction status 801 // if they arise (especially no_account, no_exchange, conflict) 802 // -- not sure how no_instance could happen... 803 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 804 { 805 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 806 "Transfer already known. Ignoring duplicate.\n"); 807 return; 808 } 809 } 810 811 { 812 struct CheckTransferContext ctc = { 813 .ec = TALER_EC_NONE, 814 .failure = false 815 }; 816 817 for (unsigned int i = 0; i<td->details_length; i++) 818 { 819 const struct TALER_TrackTransferDetails *ttd = &td->details[i]; 820 enum GNUNET_DB_QueryStatus qs; 821 822 if (TALER_EC_NONE != ctc.ec) 823 break; /* already encountered an error */ 824 ctc.current_offset = i; 825 ctc.current_detail = ttd; 826 /* Set the coin as "never seen" before. */ 827 ctc.check_transfer_result = GNUNET_NO; 828 qs = TALER_MERCHANTDB_lookup_deposits_by_contract_and_coin ( 829 pg, 830 w->instance_id, 831 &ttd->h_contract_terms, 832 &ttd->coin_pub, 833 &check_transfer, 834 &ctc); 835 switch (qs) 836 { 837 case GNUNET_DB_STATUS_SOFT_ERROR: 838 GNUNET_break (0); 839 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 840 break; 841 case GNUNET_DB_STATUS_HARD_ERROR: 842 GNUNET_break (0); 843 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 844 break; 845 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 846 /* The exchange says we made this deposit, but WE do not 847 recall making it (corrupted / unreliable database?)! 848 Well, let's say thanks and accept the money! */ 849 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 850 "Failed to find payment data in DB\n"); 851 ctc.check_transfer_result = GNUNET_OK; 852 break; 853 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 854 break; 855 } 856 switch (ctc.check_transfer_result) 857 { 858 case GNUNET_NO: 859 /* Internal error: how can we have called #check_transfer() 860 but still have no result? */ 861 GNUNET_break (0); 862 ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE; 863 return; 864 case GNUNET_SYSERR: 865 /* #check_transfer() failed, report conflict! */ 866 GNUNET_break_op (0); 867 GNUNET_assert (TALER_EC_NONE != ctc.ec); 868 break; 869 case GNUNET_OK: 870 break; 871 } 872 } 873 if (TALER_EC_NONE != ctc.ec) 874 { 875 update_transaction_status ( 876 w, 877 ctc.failure 878 ? GNUNET_TIME_UNIT_FOREVER_ABS 879 : GNUNET_TIME_relative_to_absolute ( 880 GNUNET_TIME_UNIT_MINUTES), 881 MHD_HTTP_OK, 882 ctc.ec, 883 NULL /* no hint */, 884 ! ctc.failure); 885 end_inquiry (w); 886 return; 887 } 888 } 889 890 if (GNUNET_SYSERR == 891 check_wire_fee (w, 892 td->execution_time, 893 &td->wire_fee)) 894 { 895 GNUNET_break_op (0); 896 update_transaction_status (w, 897 GNUNET_TIME_UNIT_FOREVER_ABS, 898 MHD_HTTP_OK, 899 TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE, 900 TALER_amount2s (&td->wire_fee), 901 false); 902 end_inquiry (w); 903 return; 904 } 905 906 { 907 enum GNUNET_DB_QueryStatus qs; 908 909 qs = TALER_MERCHANTDB_finalize_transfer_status (pg, 910 w->exchange->exchange_url, 911 &w->wtid, 912 &td->h_details, 913 &td->total_amount, 914 &td->wire_fee, 915 &td->exchange_pub, 916 &td->exchange_sig); 917 if (qs < 0) 918 { 919 GNUNET_break (0); 920 global_ret = EXIT_FAILURE; 921 GNUNET_SCHEDULER_shutdown (); 922 return; 923 } 924 } 925 end_inquiry (w); 926 } 927 928 929 /** 930 * Initiate download from an exchange for a given inquiry. 931 * 932 * @param cls a `struct Inquiry *` 933 */ 934 static void 935 exchange_request (void *cls) 936 { 937 struct Inquiry *w = cls; 938 struct Exchange *e = w->exchange; 939 940 w->task = NULL; 941 if (NULL == e->keys) 942 return; 943 w->wdh = TALER_EXCHANGE_get_transfers_create ( 944 ctx, 945 e->exchange_url, 946 e->keys, 947 &w->wtid); 948 if (NULL == w->wdh) 949 { 950 GNUNET_break (0); 951 e->exchange_inquiries--; 952 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 953 update_transaction_status (w, 954 GNUNET_TIME_relative_to_absolute ( 955 e->transfer_delay), 956 0 /* failed to begin */, 957 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, 958 "Failed to initiate GET request at exchange", 959 true); 960 end_inquiry (w); 961 return; 962 } 963 GNUNET_assert (TALER_EC_NONE == 964 TALER_EXCHANGE_get_transfers_start (w->wdh, 965 &wire_transfer_cb, 966 w)); 967 968 /* Wait at least 1m for the network transfer */ 969 update_transaction_status (w, 970 GNUNET_TIME_relative_to_absolute ( 971 GNUNET_TIME_UNIT_MINUTES), 972 0 /* timeout */, 973 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST, 974 "Initiated GET with exchange", 975 true); 976 } 977 978 979 /** 980 * Function called with information about a transfer we 981 * should ask the exchange about. 982 * 983 * @param cls closure (NULL) 984 * @param rowid row of the transfer in the merchant database 985 * @param instance_id instance that received the transfer 986 * @param exchange_url base URL of the exchange that initiated the transfer 987 * @param payto_uri account of the merchant that received the transfer 988 * @param wtid wire transfer subject identifying the aggregation 989 * @param next_attempt when should we next try to interact with the exchange 990 */ 991 static void 992 start_inquiry ( 993 void *cls, 994 uint64_t rowid, 995 const char *instance_id, 996 const char *exchange_url, 997 struct TALER_FullPayto payto_uri, 998 const struct TALER_WireTransferIdentifierRawP *wtid, 999 struct GNUNET_TIME_Absolute next_attempt) 1000 { 1001 struct Exchange *e; 1002 struct Inquiry *w; 1003 1004 (void) cls; 1005 if (GNUNET_TIME_absolute_is_future (next_attempt)) 1006 { 1007 if (NULL == task) 1008 task = GNUNET_SCHEDULER_add_at (next_attempt, 1009 &find_work, 1010 NULL); 1011 return; 1012 } 1013 active_inquiries++; 1014 1015 e = find_exchange (exchange_url); 1016 for (w = e->w_head; NULL != w; w = w->next) 1017 { 1018 if (0 == GNUNET_memcmp (&w->wtid, 1019 wtid)) 1020 { 1021 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1022 "Already processing inquiry. Aborting ongoing inquiry\n"); 1023 end_inquiry (w); 1024 break; 1025 } 1026 } 1027 1028 w = GNUNET_new (struct Inquiry); 1029 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 1030 w->instance_id = GNUNET_strdup (instance_id); 1031 w->rowid = rowid; 1032 w->wtid = *wtid; 1033 GNUNET_CONTAINER_DLL_insert (e->w_head, 1034 e->w_tail, 1035 w); 1036 w->exchange = e; 1037 if (NULL != w->exchange->keys) 1038 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 1039 w); 1040 /* Wait at least 1 minute for /keys */ 1041 update_transaction_status (w, 1042 GNUNET_TIME_relative_to_absolute ( 1043 GNUNET_TIME_UNIT_MINUTES), 1044 0 /* timeout */, 1045 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS, 1046 exchange_url, 1047 true); 1048 } 1049 1050 1051 static void 1052 find_work (void *cls) 1053 { 1054 enum GNUNET_DB_QueryStatus qs; 1055 int limit; 1056 1057 (void) cls; 1058 task = NULL; 1059 GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries); 1060 limit = OPEN_INQUIRY_LIMIT - active_inquiries; 1061 if (0 == limit) 1062 { 1063 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1064 "Not looking for work: at limit\n"); 1065 at_limit = true; 1066 return; 1067 } 1068 at_limit = false; 1069 qs = TALER_MERCHANTDB_select_open_transfers (pg, 1070 limit, 1071 &start_inquiry, 1072 NULL); 1073 if (qs < 0) 1074 { 1075 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1076 "Failed to obtain open transfers from database\n"); 1077 GNUNET_SCHEDULER_shutdown (); 1078 return; 1079 } 1080 if (qs >= limit) 1081 { 1082 /* DB limited response, re-trigger DB interaction 1083 the moment we significantly fall below the 1084 limit */ 1085 at_limit = true; 1086 } 1087 if (0 == active_inquiries) 1088 { 1089 if (test_mode) 1090 { 1091 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1092 "No more open inquiries and in test mode. Existing.\n"); 1093 GNUNET_SCHEDULER_shutdown (); 1094 return; 1095 } 1096 GNUNET_log ( 1097 GNUNET_ERROR_TYPE_INFO, 1098 "No open inquiries found, waiting for notification to resume\n"); 1099 } 1100 } 1101 1102 1103 /** 1104 * Function called when transfers are added to the merchant database. We look 1105 * for more work. 1106 * 1107 * @param cls closure (NULL) 1108 * @param extra additional event data provided 1109 * @param extra_size number of bytes in @a extra 1110 */ 1111 static void 1112 transfer_added (void *cls, 1113 const void *extra, 1114 size_t extra_size) 1115 { 1116 (void) cls; 1117 (void) extra; 1118 (void) extra_size; 1119 if (active_inquiries > OPEN_INQUIRY_LIMIT / 2) 1120 { 1121 /* Trigger DB only once we are substantially below the limit */ 1122 at_limit = true; 1123 return; 1124 } 1125 if (NULL != task) 1126 return; 1127 task = GNUNET_SCHEDULER_add_now (&find_work, 1128 NULL); 1129 } 1130 1131 1132 /** 1133 * Function called when keys were changed in the 1134 * merchant database. Updates ours. 1135 * 1136 * @param cls closure (NULL) 1137 * @param extra additional event data provided 1138 * @param extra_size number of bytes in @a extra 1139 */ 1140 static void 1141 keys_changed (void *cls, 1142 const void *extra, 1143 size_t extra_size) 1144 { 1145 const char *url = extra; 1146 struct Exchange *e; 1147 1148 (void) cls; 1149 if ( (NULL == extra) || 1150 (0 == extra_size) ) 1151 { 1152 GNUNET_break (0); 1153 return; 1154 } 1155 if ('\0' != url[extra_size - 1]) 1156 { 1157 GNUNET_break (0); 1158 return; 1159 } 1160 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1161 "Received keys change notification: reload `%s'\n", 1162 url); 1163 e = find_exchange (url); 1164 sync_keys (e); 1165 } 1166 1167 1168 /** 1169 * First task. 1170 * 1171 * @param cls closure, NULL 1172 * @param args remaining command-line arguments 1173 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 1174 * @param c configuration 1175 */ 1176 static void 1177 run (void *cls, 1178 char *const *args, 1179 const char *cfgfile, 1180 const struct GNUNET_CONFIGURATION_Handle *c) 1181 { 1182 (void) args; 1183 (void) cfgfile; 1184 1185 cfg = c; 1186 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1187 NULL); 1188 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 1189 &rc); 1190 rc = GNUNET_CURL_gnunet_rc_create (ctx); 1191 if (NULL == ctx) 1192 { 1193 GNUNET_break (0); 1194 GNUNET_SCHEDULER_shutdown (); 1195 global_ret = EXIT_FAILURE; 1196 return; 1197 } 1198 if (NULL == 1199 (pg = TALER_MERCHANTDB_connect (cfg))) 1200 { 1201 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1202 "Failed to initialize DB subsystem. Consider running taler-merchant-dbconfig!\n"); 1203 GNUNET_SCHEDULER_shutdown (); 1204 global_ret = EXIT_FAILURE; 1205 return; 1206 } 1207 { 1208 struct GNUNET_DB_EventHeaderP es = { 1209 .size = htons (sizeof (es)), 1210 .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_EXPECTED) 1211 }; 1212 1213 eh = TALER_MERCHANTDB_event_listen (pg, 1214 &es, 1215 GNUNET_TIME_UNIT_FOREVER_REL, 1216 &transfer_added, 1217 NULL); 1218 } 1219 { 1220 struct GNUNET_DB_EventHeaderP es = { 1221 .size = htons (sizeof (es)), 1222 .type = htons (TALER_DBEVENT_MERCHANT_EXCHANGE_KEYS) 1223 }; 1224 1225 eh_keys 1226 = TALER_MERCHANTDB_event_listen (pg, 1227 &es, 1228 GNUNET_TIME_UNIT_FOREVER_REL, 1229 &keys_changed, 1230 NULL); 1231 } 1232 1233 GNUNET_assert (NULL == task); 1234 task = GNUNET_SCHEDULER_add_now (&find_work, 1235 NULL); 1236 } 1237 1238 1239 /** 1240 * The main function of taler-merchant-reconciliation 1241 * 1242 * @param argc number of arguments from the command line 1243 * @param argv command line arguments 1244 * @return 0 ok, 1 on error 1245 */ 1246 int 1247 main (int argc, 1248 char *const *argv) 1249 { 1250 struct GNUNET_GETOPT_CommandLineOption options[] = { 1251 GNUNET_GETOPT_option_timetravel ('T', 1252 "timetravel"), 1253 GNUNET_GETOPT_option_flag ('t', 1254 "test", 1255 "run in test mode and exit when idle", 1256 &test_mode), 1257 GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), 1258 GNUNET_GETOPT_OPTION_END 1259 }; 1260 enum GNUNET_GenericReturnValue ret; 1261 1262 ret = GNUNET_PROGRAM_run ( 1263 TALER_MERCHANT_project_data (), 1264 argc, argv, 1265 "taler-merchant-reconciliation", 1266 gettext_noop ( 1267 "background process that reconciles bank transfers with orders by asking the exchange"), 1268 options, 1269 &run, NULL); 1270 if (GNUNET_SYSERR == ret) 1271 return EXIT_INVALIDARGUMENT; 1272 if (GNUNET_NO == ret) 1273 return EXIT_SUCCESS; 1274 if ( (found_problem) && 1275 (0 == global_ret) ) 1276 global_ret = 7; 1277 return global_ret; 1278 } 1279 1280 1281 /* end of taler-merchant-reconciliation.c */