taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit df153ed4bc1b3bd3786db54158f82130adcb413f
parent 166a13dc2b4971a6b83d70dbc5d1f3fc1f01d08b
Author: Antoine A <>
Date:   Thu, 26 Mar 2026 15:55:18 +0100

common: add Wire Transfer Gateway support

Diffstat:
M.gitignore | 5+++--
Mtaler-cyclos/Cargo.toml | 1+
Mtaler-cyclos/db/cyclos-0002.sql | 35+++++++++++++++++++++++++++++++++--
Mtaler-cyclos/db/cyclos-procedures.sql | 182++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mtaler-cyclos/src/api.rs | 85++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------
Mtaler-cyclos/src/db.rs | 80+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
Mtaler-cyclos/src/worker.rs | 18+++++++++++++++---
Mtaler-cyclos/tests/api.rs | 135++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
Mtaler-magnet-bank/Cargo.toml | 1+
Mtaler-magnet-bank/db/magnet-bank-0001.sql | 11+++++------
Mtaler-magnet-bank/db/magnet-bank-0002.sql | 31+++++++++++++++++++++++++++++--
Mtaler-magnet-bank/db/magnet-bank-procedures.sql | 193++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mtaler-magnet-bank/src/api.rs | 85++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------
Mtaler-magnet-bank/src/db.rs | 88++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
Mtaler-magnet-bank/src/worker.rs | 15++++++++++++---
Mtaler-magnet-bank/tests/api.rs | 133++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
16 files changed, 954 insertions(+), 144 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -11,4 +11,5 @@ debian/taler-magnet-bank debian/taler-cyclos debian/files debian/*.substvars -debian/*debhelper* -\ No newline at end of file +debian/*debhelper* +postgres-language-server.jsonc +\ No newline at end of file diff --git a/taler-cyclos/Cargo.toml b/taler-cyclos/Cargo.toml @@ -32,3 +32,4 @@ url.workspace = true [dev-dependencies] taler-test-utils.workspace = true +rand.workspace = true diff --git a/taler-cyclos/db/cyclos-0002.sql b/taler-cyclos/db/cyclos-0002.sql @@ -19,4 +19,35 @@ SET search_path TO cyclos; -- Add outgoing transactions metadata field ALTER TABLE taler_out ADD COLUMN metadata TEXT; -ALTER TABLE transfer ADD COLUMN metadata TEXT; -\ No newline at end of file +ALTER TABLE transfer ADD COLUMN metadata TEXT; + +-- Replace unused wad type with new mapping type +ALTER TYPE incoming_type RENAME VALUE 'wad' TO 'map'; + +ALTER TABLE taler_in + ADD COLUMN authorization_pub BYTEA CHECK (LENGTH(authorization_pub)=32), + ADD COLUMN authorization_sig BYTEA CHECK (LENGTH(authorization_sig)=64); + +CREATE TABLE prepared_in ( + type incoming_type NOT NULL, + account_pub BYTEA NOT NULL CHECK (LENGTH(account_pub)=32), + authorization_pub BYTEA UNIQUE NOT NULL CHECK (LENGTH(authorization_pub)=32), + authorization_sig BYTEA NOT NULL CHECK (LENGTH(authorization_sig)=64), + recurrent BOOLEAN NOT NULL, + registered_at INT8 NOT NULL, + tx_in_id INT8 UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE +); +COMMENT ON TABLE prepared_in IS 'Prepared incoming transaction'; +CREATE UNIQUE INDEX prepared_in_unique_reserve_pub + ON prepared_in (account_pub) WHERE type = 'reserve'; + +CREATE TABLE pending_recurrent_in( + tx_in_id INT8 NOT NULL UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, + authorization_pub BYTEA NOT NULL REFERENCES prepared_in(authorization_pub) +); +CREATE INDEX pending_recurrent_inc_auth_pub + ON pending_recurrent_in (authorization_pub); +COMMENT ON TABLE pending_recurrent_in IS 'Pending recurrent incoming transaction'; + +ALTER TABLE bounced +ALTER COLUMN chargeback_id DROP NOT NULL; +\ No newline at end of file diff --git a/taler-cyclos/db/cyclos-procedures.sql b/taler-cyclos/db/cyclos-procedures.sql @@ -51,13 +51,20 @@ CREATE FUNCTION register_tx_in( IN in_now INT8, -- Error status OUT out_reserve_pub_reuse BOOLEAN, + OUT out_mapping_reuse BOOLEAN, + OUT out_unknown_mapping BOOLEAN, -- Success return OUT out_tx_row_id INT8, OUT out_valued_at INT8, - OUT out_new BOOLEAN + OUT out_new BOOLEAN, + OUT out_pending BOOLEAN ) LANGUAGE plpgsql AS $$ +DECLARE +local_authorization_pub BYTEA; +local_authorization_sig BYTEA; BEGIN +out_pending=false; -- Check for idempotence SELECT tx_in_id, valued_at INTO out_tx_row_id, out_valued_at @@ -69,9 +76,23 @@ IF NOT out_new THEN RETURN; END IF; +-- Resolve mapping logic +IF in_type = 'map' THEN + SELECT type, account_pub, authorization_pub, authorization_sig, + tx_in_id IS NOT NULL AND NOT recurrent, + tx_in_id IS NOT NULL AND recurrent + INTO in_type, in_metadata, local_authorization_pub, local_authorization_sig, out_mapping_reuse, out_pending + FROM prepared_in + WHERE authorization_pub = in_metadata; + out_unknown_mapping = NOT FOUND; + IF out_unknown_mapping OR out_mapping_reuse THEN + RETURN; + END IF; +END IF; + + -- Check conflict -SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve') - INTO out_reserve_pub_reuse; +out_reserve_pub_reuse=NOT out_pending AND in_type = 'reserve' AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve'); IF out_reserve_pub_reuse THEN RETURN; END IF; @@ -100,7 +121,17 @@ INSERT INTO tx_in ( RETURNING tx_in_id INTO out_tx_row_id; -- Notify new incoming transaction registration PERFORM pg_notify('tx_in', out_tx_row_id || ''); -IF in_type IS NOT NULL THEN + +IF out_pending THEN + -- Delay talerable registration until mapping again + INSERT INTO pending_recurrent_in (tx_in_id, authorization_pub) + VALUES (out_tx_row_id, local_authorization_pub); +ELSIF in_type IS NOT NULL THEN + IF local_authorization_pub IS NOT NULL THEN + UPDATE prepared_in + SET tx_in_id = out_tx_row_id + WHERE authorization_pub = local_authorization_pub; + END IF; -- Insert new incoming talerable transaction INSERT INTO taler_in ( tx_in_id, @@ -376,4 +407,143 @@ INSERT INTO bounced ( in_reason ) ON CONFLICT (chargeback_id) DO NOTHING; END $$; -COMMENT ON FUNCTION register_bounced_tx_in IS 'Register a bounced incoming transaction idempotently'; -\ No newline at end of file +COMMENT ON FUNCTION register_bounced_tx_in IS 'Register a bounced incoming transaction idempotently'; + +CREATE FUNCTION register_prepared_transfers ( + IN in_type incoming_type, + IN in_account_pub BYTEA, + IN in_authorization_pub BYTEA, + IN in_authorization_sig BYTEA, + IN in_recurrent BOOLEAN, + IN in_timestamp INT8, + -- Error status + OUT out_reserve_pub_reuse BOOLEAN +) +LANGUAGE plpgsql AS $$ +DECLARE + talerable_tx INT8; + idempotent BOOLEAN; +BEGIN + +-- Check idempotency +SELECT type = in_type + AND account_pub = in_account_pub + AND recurrent = in_recurrent +INTO idempotent +FROM prepared_in +WHERE authorization_pub = in_authorization_pub; + +-- Check idempotency and delay garbage collection +IF FOUND AND idempotent THEN + UPDATE prepared_in + SET registered_at=in_timestamp + WHERE authorization_pub=in_authorization_pub; + RETURN; +END IF; + +-- Check reserve pub reuse +out_reserve_pub_reuse=in_type = 'reserve' AND ( + EXISTS(SELECT FROM taler_in WHERE metadata = in_account_pub AND type = 'reserve') + OR EXISTS(SELECT FROM prepared_in WHERE account_pub = in_account_pub AND type = 'reserve' AND authorization_pub != in_authorization_pub) +); +IF out_reserve_pub_reuse THEN + RETURN; +END IF; + +IF in_recurrent THEN + -- Finalize one pending right now + WITH moved_tx AS ( + DELETE FROM pending_recurrent_in + WHERE tx_in_id = ( + SELECT tx_in_id + FROM pending_recurrent_in + JOIN tx_in USING (tx_in_id) + WHERE authorization_pub = in_authorization_pub + ORDER BY registered_at ASC + LIMIT 1 + ) + RETURNING tx_in_id + ) + INSERT INTO taler_in (tx_in_id, type, metadata, authorization_pub, authorization_sig) + SELECT moved_tx.tx_in_id, in_type, in_account_pub, in_authorization_pub, in_authorization_sig + FROM moved_tx + RETURNING tx_in_id INTO talerable_tx; + IF talerable_tx IS NOT NULL THEN + PERFORM pg_notify('taler_in', talerable_tx::text); + END IF; +ELSE + -- Bounce all pending + WITH pending AS ( + DELETE FROM pending_recurrent_in + WHERE authorization_pub = in_authorization_pub + RETURNING tx_in_id + ) + INSERT INTO bounced ( + tx_in_id, + chargeback_id, + reason + ) + SELECT + tx_in_id, + NULL, + 'cancelled mapping' + FROM pending; +END IF; + +-- Upsert registration +INSERT INTO prepared_in ( + type, + account_pub, + authorization_pub, + authorization_sig, + recurrent, + registered_at, + tx_in_id +) VALUES ( + in_type, + in_account_pub, + in_authorization_pub, + in_authorization_sig, + in_recurrent, + in_timestamp, + talerable_tx +) ON CONFLICT (authorization_pub) +DO UPDATE SET + type = EXCLUDED.type, + account_pub = EXCLUDED.account_pub, + recurrent = EXCLUDED.recurrent, + registered_at = EXCLUDED.registered_at, + tx_in_id = EXCLUDED.tx_in_id, + authorization_sig = EXCLUDED.authorization_sig; +END $$; + +CREATE FUNCTION delete_prepared_transfers ( + IN in_authorization_pub BYTEA, + OUT out_found BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN + +-- Bounce all pending +WITH pending AS ( + DELETE FROM pending_recurrent_in + WHERE authorization_pub = in_authorization_pub + RETURNING tx_in_id +) +INSERT INTO bounced ( + tx_in_id, + chargeback_id, + reason +) +SELECT + tx_in_id, + NULL, + 'cancelled mapping' +FROM pending; + +-- Delete registration +DELETE FROM prepared_in +WHERE authorization_pub = in_authorization_pub; +out_found = FOUND; + +END $$; +\ No newline at end of file diff --git a/taler-cyclos/src/api.rs b/taler-cyclos/src/api.rs @@ -17,21 +17,25 @@ use compact_str::CompactString; use jiff::Timestamp; use taler_api::{ - api::{TalerApi, revenue::Revenue, wire::WireGateway}, - error::{ApiResult, failure}, - subject::IncomingSubject, + api::{TalerApi, revenue::Revenue, transfer::WireTransferGateway, wire::WireGateway}, + error::{ApiResult, failure, failure_code}, + subject::{IncomingSubject, fmt_in_subject}, }; use taler_common::{ api_common::{SafeU64, safe_u64}, api_params::{History, Page}, api_revenue::RevenueIncomingHistory, + api_transfer::{ + RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration, + }, api_wire::{ AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, TransferState, TransferStatus, }, + db::IncomingType, error_code::ErrorCode, - types::{amount::Currency, payto::PaytoURI}, + types::{amount::Currency, payto::PaytoURI, timestamp::TalerTimestamp}, }; use tokio::sync::watch::Sender; @@ -115,14 +119,12 @@ impl WireGateway for CyclosApi { timestamp: initiated_at.into(), row_id: SafeU64::try_from(id).unwrap(), }), - db::TransferResult::RequestUidReuse => Err(failure( - ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, - "request_uid used already", - )), - db::TransferResult::WtidReuse => Err(failure( - ErrorCode::BANK_TRANSFER_WTID_REUSED, - "wtid used already", - )), + db::TransferResult::RequestUidReuse => { + Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) + } + db::TransferResult::WtidReuse => { + Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) + } } } @@ -194,10 +196,12 @@ impl WireGateway for CyclosApi { row_id: safe_u64(row_id), timestamp: valued_at.into(), }), - AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), + AddIncomingResult::ReservePubReuse => { + Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) + } + AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { + unreachable!("mapping unused") + } } } @@ -222,10 +226,12 @@ impl WireGateway for CyclosApi { row_id: safe_u64(row_id), timestamp: valued_at.into(), }), - AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), + AddIncomingResult::ReservePubReuse => { + Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) + } + AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { + unreachable!("mapping unused") + } } } @@ -249,3 +255,42 @@ impl Revenue for CyclosApi { }) } } + +impl WireTransferGateway for CyclosApi { + fn supported_formats(&self) -> &[SubjectFormat] { + &[SubjectFormat::SIMPLE] + } + + async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { + match db::transfer_register(&self.pool, &req).await? { + db::RegistrationResult::Success => { + let simple = TransferSubject::Simple { + credit_amount: req.credit_amount, + subject: if req.authorization_pub == req.account_pub && !req.recurrent { + fmt_in_subject(req.r#type.into(), &req.account_pub) + } else { + fmt_in_subject(IncomingType::map, &req.authorization_pub) + }, + }; + ApiResult::Ok(RegistrationResponse { + subjects: vec![simple], + expiration: TalerTimestamp::Never, + }) + } + db::RegistrationResult::ReservePubReuse => { + ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) + } + } + } + + async fn unregistration(&self, req: Unregistration) -> ApiResult<()> { + if !db::transfer_unregister(&self.pool, &req).await? { + Err(failure( + ErrorCode::BANK_TRANSACTION_NOT_FOUND, + format!("Prepared transfer '{}' not found", req.authorization_pub), + )) + } else { + Ok(()) + } + } +} diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs @@ -29,6 +29,7 @@ use taler_common::{ api_common::{HashCode, ShortHashCode}, api_params::{History, Page}, api_revenue::RevenueIncomingBankTransaction, + api_transfer::{RegistrationRequest, Unregistration}, api_wire::{ IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, TransferStatus, @@ -191,10 +192,13 @@ pub struct TxInAdmin { pub enum AddIncomingResult { Success { new: bool, + pending: bool, row_id: u64, valued_at: Timestamp, }, ReservePubReuse, + UnknownMapping, + MappingReuse, } /// Lock the database for worker execution @@ -213,7 +217,7 @@ pub async fn register_tx_in_admin( serialized!( sqlx::query( " - SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) ", ) @@ -224,14 +228,19 @@ pub async fn register_tx_in_admin( .bind_timestamp(now) .bind(tx.metadata.ty()) .bind(tx.metadata.key()) - .try_map(|r: PgRow| { + .try_map(|r: PgRow| { Ok(if r.try_get_flag(0)? { AddIncomingResult::ReservePubReuse + } else if r.try_get_flag(1)? { + AddIncomingResult::MappingReuse + } else if r.try_get_flag(2)? { + AddIncomingResult::UnknownMapping } else { AddIncomingResult::Success { - row_id: r.try_get_u64(1)?, - valued_at: r.try_get_timestamp(2)?, - new: r.try_get(3)?, + row_id: r.try_get_u64(3)?, + valued_at: r.try_get_timestamp(4)?, + new: r.try_get(5)?, + pending: r.try_get(6)? } }) }) @@ -248,7 +257,7 @@ pub async fn register_tx_in( serialized!( sqlx::query( " - SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ", ) @@ -265,11 +274,16 @@ pub async fn register_tx_in( .try_map(|r: PgRow| { Ok(if r.try_get_flag(0)? { AddIncomingResult::ReservePubReuse + } else if r.try_get_flag(1)? { + AddIncomingResult::MappingReuse + } else if r.try_get_flag(2)? { + AddIncomingResult::UnknownMapping } else { AddIncomingResult::Success { - row_id: r.try_get_u64(1)?, - valued_at: r.try_get_timestamp(2)?, - new: r.try_get(3)?, + row_id: r.try_get_u64(3)?, + valued_at: r.try_get_timestamp(4)?, + new: r.try_get(5)?, + pending: r.try_get(6)? } }) }) @@ -805,6 +819,46 @@ pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) - Ok(()) } +pub enum RegistrationResult { + Success, + ReservePubReuse, +} + +pub async fn transfer_register( + db: &PgPool, + req: &RegistrationRequest, +) -> sqlx::Result<RegistrationResult> { + let ty: IncomingType = req.r#type.into(); + serialized!( + sqlx::query( + "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" + ) + .bind(ty) + .bind(&req.account_pub) + .bind(&req.authorization_pub) + .bind(&req.authorization_sig) + .bind(req.recurrent) + .bind_timestamp(&Timestamp::now()) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag("out_reserve_pub_reuse")? { + RegistrationResult::ReservePubReuse + } else { + RegistrationResult::Success + }) + }) + .fetch_one(db) + ) +} + +pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { + serialized!( + sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)") + .bind(&req.authorization_pub) + .try_map(|r: PgRow| r.try_get_flag("out_found")) + .fetch_one(db) + ) +} + pub trait CyclosTypeHelper { fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( &self, @@ -946,8 +1000,9 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: id, - valued_at: now + valued_at: now, } ); // Idempotent @@ -965,6 +1020,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: false, + pending: false, row_id: id, valued_at: now } @@ -985,6 +1041,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: id + 1, valued_at: later } @@ -1067,6 +1124,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: 1, valued_at: now } @@ -1078,6 +1136,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: false, + pending: false, row_id: 1, valued_at: now } @@ -1097,6 +1156,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: 2, valued_at: later } diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs @@ -299,7 +299,9 @@ impl Worker<'_> { trace!(target: "worker", "in {tx} already seen and cannot bounce "); } } - AddIncomingResult::ReservePubReuse => unreachable!(), + AddIncomingResult::ReservePubReuse + | AddIncomingResult::UnknownMapping + | AddIncomingResult::MappingReuse => unreachable!(), } } else { let chargeback_id = self.client.chargeback(*transfer.id).await?; @@ -342,7 +344,9 @@ impl Worker<'_> { trace!(target: "worker", "in {tx} chargeback already seen"); } } - AddIncomingResult::ReservePubReuse => unreachable!(), + AddIncomingResult::ReservePubReuse + | AddIncomingResult::UnknownMapping + | AddIncomingResult::MappingReuse => unreachable!(), } return Ok(()); @@ -363,6 +367,12 @@ impl Worker<'_> { AddIncomingResult::ReservePubReuse => { bounce(self.db, "reserve pub reuse").await? } + AddIncomingResult::UnknownMapping => { + bounce(self.db, "unknown mapping").await? + } + AddIncomingResult::MappingReuse => { + bounce(self.db, "mapping reuse").await? + } } } Err(e) => bounce(self.db, &e.to_string()).await?, @@ -377,7 +387,9 @@ impl Worker<'_> { trace!(target: "worker", "in {tx} already seen"); } } - AddIncomingResult::ReservePubReuse => unreachable!(), + AddIncomingResult::ReservePubReuse + | AddIncomingResult::UnknownMapping + | AddIncomingResult::MappingReuse => unreachable!(), } } } diff --git a/taler-cyclos/tests/api.rs b/taler-cyclos/tests/api.rs @@ -14,45 +14,63 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{str::FromStr, sync::Arc}; +use std::{ + str::FromStr, + sync::{Arc, LazyLock}, +}; use compact_str::CompactString; use jiff::Timestamp; -use sqlx::PgPool; -use taler_api::{api::TalerRouter as _, auth::AuthMethod, subject::OutgoingSubject}; +use sqlx::{PgPool, Row as _, postgres::PgRow}; +use taler_api::{ + api::TalerRouter, + auth::AuthMethod, + db::TypeHelper as _, + subject::{IncomingSubject, OutgoingSubject}, +}; use taler_common::{ + api_common::EddsaPublicKey, api_revenue::RevenueConfig, + api_transfer::WireTransferConfig, api_wire::{OutgoingHistory, TransferState, WireConfig}, + db::IncomingType, types::{ amount::{Currency, decimal}, - payto::payto, + payto::{PaytoURI, payto}, }, }; use taler_cyclos::{ api::CyclosApi, constants::CONFIG_SOURCE, - db::{self, TxOutKind}, + db::{self, AddIncomingResult, TxIn, TxOutKind}, }; use taler_test_utils::{ Router, db::db_test_setup, - routine::{admin_add_incoming_routine, revenue_routine, routine_pagination, transfer_routine}, + routine::{ + Status, admin_add_incoming_routine, registration_routine, revenue_routine, + routine_pagination, transfer_routine, + }, server::TestServer, }; +static ACCOUNT: LazyLock<PaytoURI> = + LazyLock::new(|| payto("payto://cyclos/localhost/7762070814178012479?receiver-name=name")); + async fn setup() -> (Router, PgPool) { let (_, pool) = db_test_setup(CONFIG_SOURCE).await; let api = Arc::new( CyclosApi::start( pool.clone(), CompactString::const_new("localhost"), - payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), + ACCOUNT.clone(), Currency::from_str("TEST").unwrap(), ) .await, ); let server = Router::new() .wire_gateway(api.clone(), AuthMethod::None) + .wire_transfer_gateway(api.clone()) .revenue(api, AuthMethod::None) .finalize(); @@ -67,6 +85,10 @@ async fn config() { .await .assert_ok_json::<WireConfig>(); server + .get("/taler-wire-transfer-gateway/config") + .await + .assert_ok_json::<WireTransferConfig>(); + server .get("/taler-revenue/config") .await .assert_ok_json::<RevenueConfig>(); @@ -75,12 +97,7 @@ async fn config() { #[tokio::test] async fn transfer() { let (server, _) = setup().await; - transfer_routine( - &server, - TransferState::pending, - &payto("payto://cyclos/localhost/7762070814178012479?receiver-name=name"), - ) - .await; + transfer_routine(&server, TransferState::pending, &ACCOUNT).await; } #[tokio::test] @@ -124,21 +141,95 @@ async fn outgoing_history() { #[tokio::test] async fn admin_add_incoming() { let (server, _) = setup().await; - admin_add_incoming_routine( - &server, - &payto("payto://cyclos/localhost/7762070814178012479?receiver-name=name"), - true, - ) - .await; + admin_add_incoming_routine(&server, &ACCOUNT, true).await; } #[tokio::test] async fn revenue() { let (server, _) = setup().await; - revenue_routine( + revenue_routine(&server, &ACCOUNT, true).await; +} + +async fn check_in(pool: &PgPool) -> Vec<Status> { + sqlx::query( + " + SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata + FROM tx_in + LEFT JOIN taler_in USING (tx_in_id) + LEFT JOIN pending_recurrent_in USING (tx_in_id) + LEFT JOIN bounced USING (tx_in_id) + ORDER BY tx_in.tx_in_id + ", + ) + .try_map(|r: PgRow| { + Ok( + if r.try_get_flag(0)? { + Status::Pending + } else if r.try_get_flag(1)? { + Status::Bounced + } else { + match r.try_get(2)? { + None => Status::Simple, + Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), + Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), + Some(e) => unreachable!("{e:?}") + } + } + ) + }) + .fetch_all(pool) + .await + .unwrap() +} + +pub async fn test_in(pool: &PgPool, key: EddsaPublicKey) { + let tx = TxIn { + transfer_id: rand::random_range(10..10000), + tx_id: None, + amount: decimal("12"), + subject: String::new(), + debtor_id: rand::random_range(10..10000), + debtor_name: "Name".to_owned(), + valued_at: Timestamp::now(), + }; + let mut db = pool.acquire().await.unwrap(); + let reason = match db::register_tx_in( + &mut db, + &tx, + &Some(IncomingSubject::Map(key)), + &Timestamp::now(), + ) + .await + .unwrap() + { + AddIncomingResult::Success { .. } => return, + AddIncomingResult::ReservePubReuse => "reserve pub reuse", + AddIncomingResult::UnknownMapping => "unknown mapping", + AddIncomingResult::MappingReuse => "mapping reuse", + }; + db::register_bounced_tx_in( + &mut db, + &tx, + rand::random_range(10..10000), + reason, + &Timestamp::now(), + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn registration() { + let (server, pool) = setup().await; + registration_routine( &server, - &payto("payto://cyclos/localhost/7762070814178012479?receiver-name=name"), - true, + &ACCOUNT, + || check_in(&pool), + |account_pub| { + let account_pub = account_pub.clone(); + let pool = &pool; + async move { test_in(pool, account_pub).await } + }, ) .await; } diff --git a/taler-magnet-bank/Cargo.toml b/taler-magnet-bank/Cargo.toml @@ -39,3 +39,4 @@ compact_str.workspace = true [dev-dependencies] taler-test-utils.workspace = true +rand.workspace = true diff --git a/taler-magnet-bank/db/magnet-bank-0001.sql b/taler-magnet-bank/db/magnet-bank-0001.sql @@ -43,7 +43,7 @@ CREATE TABLE tx_out( valued_at INT8 NOT NULL, registered_at INT8 NOT NULL ); -COMMENT ON TABLE tx_in IS 'Outgoing transactions'; +COMMENT ON TABLE tx_out IS 'Outgoing transactions'; CREATE TYPE incoming_type AS ENUM ('reserve' ,'kyc', 'wad'); @@ -61,8 +61,7 @@ CREATE TABLE taler_in( END ) ); -COMMENT ON TABLE tx_in IS 'Incoming talerable transactions'; - +COMMENT ON TABLE taler_in IS 'Incoming talerable transactions'; CREATE UNIQUE INDEX taler_in_unique_reserve_pub ON taler_in (metadata) WHERE type = 'reserve'; CREATE TABLE taler_out( @@ -70,7 +69,7 @@ CREATE TABLE taler_out( wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), exchange_base_url TEXT NOT NULL ); -COMMENT ON TABLE tx_in IS 'Outgoing talerable transactions'; +COMMENT ON TABLE taler_out IS 'Outgoing talerable transactions'; CREATE TYPE transfer_status AS ENUM( 'pending', @@ -95,7 +94,7 @@ CREATE TABLE initiated( tx_out_id INT8 UNIQUE REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, initiated_at INT8 NOT NULL ); -COMMENT ON TABLE tx_in IS 'Initiated outgoing transactions'; +COMMENT ON TABLE initiated IS 'Initiated outgoing transactions'; CREATE TABLE transfer( initiated_id INT8 PRIMARY KEY REFERENCES initiated(initiated_id) ON DELETE CASCADE, @@ -110,7 +109,7 @@ CREATE TABLE bounced( initiated_id INT8 NOT NULL UNIQUE REFERENCES initiated(initiated_id) ON DELETE CASCADE, reason TEXT NOT NULL ); -COMMENT ON TABLE tx_in IS 'Bounced transactions'; +COMMENT ON TABLE bounced IS 'Bounced transactions'; CREATE TABLE kv( key TEXT NOT NULL UNIQUE PRIMARY KEY, diff --git a/taler-magnet-bank/db/magnet-bank-0002.sql b/taler-magnet-bank/db/magnet-bank-0002.sql @@ -19,4 +19,32 @@ SET search_path TO magnet_bank; -- Add outgoing transactions metadata field ALTER TABLE taler_out ADD COLUMN metadata TEXT; -ALTER TABLE transfer ADD COLUMN metadata TEXT; -\ No newline at end of file +ALTER TABLE transfer ADD COLUMN metadata TEXT; + +-- Replace unused wad type with new mapping type +ALTER TYPE incoming_type RENAME VALUE 'wad' TO 'map'; + +ALTER TABLE taler_in + ADD COLUMN authorization_pub BYTEA CHECK (LENGTH(authorization_pub)=32), + ADD COLUMN authorization_sig BYTEA CHECK (LENGTH(authorization_sig)=64); + +CREATE TABLE prepared_in ( + type incoming_type NOT NULL, + account_pub BYTEA NOT NULL CHECK (LENGTH(account_pub)=32), + authorization_pub BYTEA UNIQUE NOT NULL CHECK (LENGTH(authorization_pub)=32), + authorization_sig BYTEA NOT NULL CHECK (LENGTH(authorization_sig)=64), + recurrent BOOLEAN NOT NULL, + registered_at INT8 NOT NULL, + tx_in_id INT8 UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE +); +COMMENT ON TABLE prepared_in IS 'Prepared incoming transaction'; +CREATE UNIQUE INDEX prepared_in_unique_reserve_pub + ON prepared_in (account_pub) WHERE type = 'reserve'; + +CREATE TABLE pending_recurrent_in( + tx_in_id INT8 NOT NULL UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, + authorization_pub BYTEA NOT NULL REFERENCES prepared_in(authorization_pub) +); +CREATE INDEX pending_recurrent_inc_auth_pub + ON pending_recurrent_in (authorization_pub); +COMMENT ON TABLE pending_recurrent_in IS 'Pending recurrent incoming transaction'; diff --git a/taler-magnet-bank/db/magnet-bank-procedures.sql b/taler-magnet-bank/db/magnet-bank-procedures.sql @@ -50,13 +50,20 @@ CREATE FUNCTION register_tx_in( IN in_now INT8, -- Error status OUT out_reserve_pub_reuse BOOLEAN, + OUT out_mapping_reuse BOOLEAN, + OUT out_unknown_mapping BOOLEAN, -- Success return OUT out_tx_row_id INT8, OUT out_valued_at INT8, - OUT out_new BOOLEAN + OUT out_new BOOLEAN, + OUT out_pending BOOLEAN ) LANGUAGE plpgsql AS $$ +DECLARE +local_authorization_pub BYTEA; +local_authorization_sig BYTEA; BEGIN +out_pending=false; -- Check for idempotence SELECT tx_in_id, valued_at INTO out_tx_row_id, out_valued_at @@ -68,9 +75,22 @@ IF NOT out_new THEN RETURN; END IF; +-- Resolve mapping logic +IF in_type = 'map' THEN + SELECT type, account_pub, authorization_pub, authorization_sig, + tx_in_id IS NOT NULL AND NOT recurrent, + tx_in_id IS NOT NULL AND recurrent + INTO in_type, in_metadata, local_authorization_pub, local_authorization_sig, out_mapping_reuse, out_pending + FROM prepared_in + WHERE authorization_pub = in_metadata; + out_unknown_mapping = NOT FOUND; + IF out_unknown_mapping OR out_mapping_reuse THEN + RETURN; + END IF; +END IF; + -- Check conflict -SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve') - INTO out_reserve_pub_reuse; +out_reserve_pub_reuse=NOT out_pending AND in_type = 'reserve' AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve'); IF out_reserve_pub_reuse THEN RETURN; END IF; @@ -97,7 +117,17 @@ INSERT INTO tx_in ( RETURNING tx_in_id INTO out_tx_row_id; -- Notify new incoming transaction registration PERFORM pg_notify('tx_in', out_tx_row_id || ''); -IF in_type IS NOT NULL THEN + +IF out_pending THEN + -- Delay talerable registration until mapping again + INSERT INTO pending_recurrent_in (tx_in_id, authorization_pub) + VALUES (out_tx_row_id, local_authorization_pub); +ELSIF in_type IS NOT NULL THEN + IF local_authorization_pub IS NOT NULL THEN + UPDATE prepared_in + SET tx_in_id = out_tx_row_id + WHERE authorization_pub = local_authorization_pub; + END IF; -- Insert new incoming talerable transaction INSERT INTO taler_in ( tx_in_id, @@ -341,7 +371,6 @@ CREATE FUNCTION register_bounce_tx_in( IN in_debit_account TEXT, IN in_debit_name TEXT, IN in_valued_at INT8, - IN in_bounce_amount taler_amount, IN in_reason TEXT, IN in_now INT8, -- Success return @@ -392,4 +421,154 @@ IF out_bounce_new THEN ); END IF; END $$; -COMMENT ON FUNCTION register_bounce_tx_in IS 'Register an incoming transaction and bounce it idempotently'; -\ No newline at end of file +COMMENT ON FUNCTION register_bounce_tx_in IS 'Register an incoming transaction and bounce it idempotently'; + +CREATE FUNCTION bounce_pending( + in_authorization_pub BYTEA, + in_timestamp INT8 +) +RETURNS void +LANGUAGE plpgsql AS $$ +DECLARE + local_tx_id INT8; + local_initiated_id INTEGER; +BEGIN +FOR local_tx_id IN + DELETE FROM pending_recurrent_in + WHERE authorization_pub = in_authorization_pub + RETURNING tx_in_id +LOOP + INSERT INTO initiated ( + amount, + subject, + credit_account, + credit_name, + initiated_at + ) + SELECT + amount, + 'bounce: ' || magnet_code, + debit_account, + debit_name, + in_timestamp + FROM tx_in + WHERE tx_in_id = local_tx_id + RETURNING initiated_id INTO local_initiated_id; + + INSERT INTO bounced (tx_in_id, initiated_id, reason) + VALUES (local_tx_id, local_initiated_id, 'cancelled mapping'); +END LOOP; +END; +$$; + +CREATE FUNCTION register_prepared_transfers ( + IN in_type incoming_type, + IN in_account_pub BYTEA, + IN in_authorization_pub BYTEA, + IN in_authorization_sig BYTEA, + IN in_recurrent BOOLEAN, + IN in_timestamp INT8, + -- Error status + OUT out_reserve_pub_reuse BOOLEAN +) +LANGUAGE plpgsql AS $$ +DECLARE + talerable_tx INT8; + idempotent BOOLEAN; +BEGIN + +-- Check idempotency +SELECT type = in_type + AND account_pub = in_account_pub + AND recurrent = in_recurrent +INTO idempotent +FROM prepared_in +WHERE authorization_pub = in_authorization_pub; + +-- Check idempotency and delay garbage collection +IF FOUND AND idempotent THEN + UPDATE prepared_in + SET registered_at=in_timestamp + WHERE authorization_pub=in_authorization_pub; + RETURN; +END IF; + +-- Check reserve pub reuse +out_reserve_pub_reuse=in_type = 'reserve' AND ( + EXISTS(SELECT FROM taler_in WHERE metadata = in_account_pub AND type = 'reserve') + OR EXISTS(SELECT FROM prepared_in WHERE account_pub = in_account_pub AND type = 'reserve' AND authorization_pub != in_authorization_pub) +); +IF out_reserve_pub_reuse THEN + RETURN; +END IF; + +IF in_recurrent THEN + -- Finalize one pending right now + WITH moved_tx AS ( + DELETE FROM pending_recurrent_in + WHERE tx_in_id = ( + SELECT tx_in_id + FROM pending_recurrent_in + JOIN tx_in USING (tx_in_id) + WHERE authorization_pub = in_authorization_pub + ORDER BY registered_at ASC + LIMIT 1 + ) + RETURNING tx_in_id + ) + INSERT INTO taler_in (tx_in_id, type, metadata, authorization_pub, authorization_sig) + SELECT moved_tx.tx_in_id, in_type, in_account_pub, in_authorization_pub, in_authorization_sig + FROM moved_tx + RETURNING tx_in_id INTO talerable_tx; + IF talerable_tx IS NOT NULL THEN + PERFORM pg_notify('taler_in', talerable_tx::text); + END IF; +ELSE + -- Bounce all pending + PERFORM bounce_pending(in_authorization_pub, in_timestamp); +END IF; + +-- Upsert registration +INSERT INTO prepared_in ( + type, + account_pub, + authorization_pub, + authorization_sig, + recurrent, + registered_at, + tx_in_id +) VALUES ( + in_type, + in_account_pub, + in_authorization_pub, + in_authorization_sig, + in_recurrent, + in_timestamp, + talerable_tx +) ON CONFLICT (authorization_pub) +DO UPDATE SET + type = EXCLUDED.type, + account_pub = EXCLUDED.account_pub, + recurrent = EXCLUDED.recurrent, + registered_at = EXCLUDED.registered_at, + tx_in_id = EXCLUDED.tx_in_id, + authorization_sig = EXCLUDED.authorization_sig; +END $$; + +CREATE FUNCTION delete_prepared_transfers ( + IN in_authorization_pub BYTEA, + IN in_timestamp INT8, + OUT out_found BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN + +-- Bounce all pending +PERFORM bounce_pending(in_authorization_pub, in_timestamp); + +-- Delete registration +DELETE FROM prepared_in +WHERE authorization_pub = in_authorization_pub; +out_found = FOUND; + +END $$; +\ No newline at end of file diff --git a/taler-magnet-bank/src/api.rs b/taler-magnet-bank/src/api.rs @@ -16,21 +16,25 @@ use jiff::Timestamp; use taler_api::{ - api::{TalerApi, revenue::Revenue, wire::WireGateway}, - error::{ApiResult, failure}, - subject::IncomingSubject, + api::{TalerApi, revenue::Revenue, transfer::WireTransferGateway, wire::WireGateway}, + error::{ApiResult, failure, failure_code}, + subject::{IncomingSubject, fmt_in_subject}, }; use taler_common::{ api_common::{SafeU64, safe_u64}, api_params::{History, Page}, api_revenue::RevenueIncomingHistory, + api_transfer::{ + RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration, + }, api_wire::{ AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, TransferState, TransferStatus, }, + db::IncomingType, error_code::ErrorCode, - types::{payto::PaytoURI, utils::date_to_utc_ts}, + types::{payto::PaytoURI, timestamp::TalerTimestamp, utils::date_to_utc_ts}, }; use tokio::sync::watch::Sender; @@ -105,14 +109,12 @@ impl WireGateway for MagnetApi { timestamp: initiated_at.into(), row_id: SafeU64::try_from(id).unwrap(), }), - db::TransferResult::RequestUidReuse => Err(failure( - ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, - "request_uid used already", - )), - db::TransferResult::WtidReuse => Err(failure( - ErrorCode::BANK_TRANSFER_WTID_REUSED, - "wtid used already", - )), + db::TransferResult::RequestUidReuse => { + Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) + } + db::TransferResult::WtidReuse => { + Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) + } } } @@ -174,10 +176,12 @@ impl WireGateway for MagnetApi { row_id: safe_u64(row_id), timestamp: date_to_utc_ts(&valued_at).into(), }), - AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), + AddIncomingResult::ReservePubReuse => { + Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) + } + AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { + unreachable!("mapping not used") + } } } @@ -201,10 +205,12 @@ impl WireGateway for MagnetApi { row_id: safe_u64(row_id), timestamp: date_to_utc_ts(&valued_at).into(), }), - AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), + AddIncomingResult::ReservePubReuse => { + Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) + } + AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { + unreachable!("mapping not used") + } } } @@ -224,3 +230,42 @@ impl Revenue for MagnetApi { }) } } + +impl WireTransferGateway for MagnetApi { + fn supported_formats(&self) -> &[SubjectFormat] { + &[SubjectFormat::SIMPLE] + } + + async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { + match db::transfer_register(&self.pool, &req).await? { + db::RegistrationResult::Success => { + let simple = TransferSubject::Simple { + credit_amount: req.credit_amount, + subject: if req.authorization_pub == req.account_pub && !req.recurrent { + fmt_in_subject(req.r#type.into(), &req.account_pub) + } else { + fmt_in_subject(IncomingType::map, &req.authorization_pub) + }, + }; + ApiResult::Ok(RegistrationResponse { + subjects: vec![simple], + expiration: TalerTimestamp::Never, + }) + } + db::RegistrationResult::ReservePubReuse => { + ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) + } + } + } + + async fn unregistration(&self, req: Unregistration) -> ApiResult<()> { + if !db::transfer_unregister(&self.pool, &req).await? { + Err(failure( + ErrorCode::BANK_TRANSACTION_NOT_FOUND, + format!("Prepared transfer '{}' not found", req.authorization_pub), + )) + } else { + Ok(()) + } + } +} diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -29,6 +29,7 @@ use taler_common::{ api_common::{HashCode, ShortHashCode}, api_params::{History, Page}, api_revenue::RevenueIncomingBankTransaction, + api_transfer::{RegistrationRequest, Unregistration}, api_wire::{ IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, TransferStatus, @@ -187,10 +188,13 @@ pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { pub enum AddIncomingResult { Success { new: bool, + pending: bool, row_id: u64, valued_at: Date, }, ReservePubReuse, + UnknownMapping, + MappingReuse, } pub async fn register_tx_in_admin( @@ -201,7 +205,7 @@ pub async fn register_tx_in_admin( serialized!( sqlx::query( " - SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) ", ) @@ -215,11 +219,16 @@ pub async fn register_tx_in_admin( .try_map(|r: PgRow| { Ok(if r.try_get_flag(0)? { AddIncomingResult::ReservePubReuse + } else if r.try_get_flag(1)? { + AddIncomingResult::MappingReuse + } else if r.try_get_flag(2)? { + AddIncomingResult::UnknownMapping } else { AddIncomingResult::Success { - row_id: r.try_get_u64(1)?, - valued_at: r.try_get_date(2)?, - new: r.try_get(3)?, + row_id: r.try_get_u64(3)?, + valued_at: r.try_get_date(4)?, + new: r.try_get(5)?, + pending: r.try_get(6)? } }) }) @@ -236,7 +245,7 @@ pub async fn register_tx_in( serialized!( sqlx::query( " - SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) ", ) @@ -252,11 +261,16 @@ pub async fn register_tx_in( .try_map(|r: PgRow| { Ok(if r.try_get_flag(0)? { AddIncomingResult::ReservePubReuse + } else if r.try_get_flag(1)? { + AddIncomingResult::MappingReuse + } else if r.try_get_flag(2)? { + AddIncomingResult::UnknownMapping } else { AddIncomingResult::Success { - row_id: r.try_get_u64(1)?, - valued_at: r.try_get_date(2)?, - new: r.try_get(3)?, + row_id: r.try_get_u64(3)?, + valued_at: r.try_get_date(4)?, + new: r.try_get(5)?, + pending: r.try_get(6)? } }) }) @@ -435,7 +449,6 @@ pub struct BounceResult { pub async fn register_bounce_tx_in( db: &mut PgConnection, tx: &TxIn, - amount: &Amount, reason: &str, now: &Timestamp, ) -> sqlx::Result<BounceResult> { @@ -443,7 +456,7 @@ pub async fn register_bounce_tx_in( sqlx::query( " SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new - FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) + FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8) ", ) .bind(tx.code as i64) @@ -452,7 +465,6 @@ pub async fn register_bounce_tx_in( .bind(tx.debtor.iban()) .bind(&tx.debtor.name) .bind_date(&tx.value_date) - .bind(amount) .bind(reason) .bind_timestamp(now) .try_map(|r: PgRow| { @@ -821,6 +833,47 @@ pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) - Ok(()) } +pub enum RegistrationResult { + Success, + ReservePubReuse, +} + +pub async fn transfer_register( + db: &PgPool, + req: &RegistrationRequest, +) -> sqlx::Result<RegistrationResult> { + let ty: IncomingType = req.r#type.into(); + serialized!( + sqlx::query( + "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" + ) + .bind(ty) + .bind(&req.account_pub) + .bind(&req.authorization_pub) + .bind(&req.authorization_sig) + .bind(req.recurrent) + .bind_timestamp(&Timestamp::now()) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag("out_reserve_pub_reuse")? { + RegistrationResult::ReservePubReuse + } else { + RegistrationResult::Success + }) + }) + .fetch_one(db) + ) +} + +pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { + serialized!( + sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)") + .bind(&req.authorization_pub) + .bind_timestamp(&Timestamp::now()) + .try_map(|r: PgRow| r.try_get_flag("out_found")) + .fetch_one(db) + ) +} + #[cfg(test)] mod test { use jiff::{Span, Timestamp, Zoned}; @@ -911,6 +964,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: id, valued_at: date } @@ -930,6 +984,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: false, + pending: false, row_id: id, valued_at: date } @@ -950,6 +1005,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: id + 1, valued_at: later } @@ -1032,6 +1088,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: 1, valued_at: date } @@ -1043,6 +1100,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: false, + pending: false, row_id: 1, valued_at: date } @@ -1062,6 +1120,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: 2, valued_at: date } @@ -1277,7 +1336,7 @@ mod test { status: TxStatus::Completed, }; assert_eq!( - db::register_bounce_tx_in(&mut db, &tx, &tx.amount, "no reason", &now) + db::register_bounce_tx_in(&mut db, &tx, "no reason", &now) .await .unwrap(), BounceResult { @@ -1437,6 +1496,7 @@ mod test { .expect("register tx in"), AddIncomingResult::Success { new: true, + pending: false, row_id: 1, valued_at: date } @@ -1454,7 +1514,6 @@ mod test { value_date: date, status: TxStatus::Completed }, - &amount, "good reason", &now ) @@ -1479,7 +1538,6 @@ mod test { value_date: date, status: TxStatus::Completed }, - &amount, "good reason", &now ) @@ -1505,7 +1563,6 @@ mod test { value_date: date, status: TxStatus::Completed }, - &amount, "good reason", &now ) @@ -1530,7 +1587,6 @@ mod test { value_date: date, status: TxStatus::Completed }, - &amount, "good reason", &now ) diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs @@ -217,13 +217,14 @@ impl Worker<'_> { trace!(target: "worker", "in {tx_in} already skip bounce "); } } - AddIncomingResult::ReservePubReuse => unreachable!(), + AddIncomingResult::ReservePubReuse + | AddIncomingResult::UnknownMapping + | AddIncomingResult::MappingReuse => unreachable!(), } } else { let res = db::register_bounce_tx_in( db, &tx_in, - &tx_in.amount, reason, &Timestamp::now(), ) @@ -265,6 +266,12 @@ impl Worker<'_> { AddIncomingResult::ReservePubReuse => { bounce(self.db, "reserve pub reuse").await? } + AddIncomingResult::UnknownMapping => { + bounce(self.db, "unknown mapping").await? + } + AddIncomingResult::MappingReuse => { + bounce(self.db, "mapping reuse").await? + } }, Err(e) => bounce(self.db, &e.to_string()).await?, } @@ -280,7 +287,9 @@ impl Worker<'_> { trace!(target: "worker", "in {tx_in} already seen"); } } - AddIncomingResult::ReservePubReuse => unreachable!(), + AddIncomingResult::ReservePubReuse + | AddIncomingResult::UnknownMapping + | AddIncomingResult::MappingReuse => unreachable!(), } } } diff --git a/taler-magnet-bank/tests/api.rs b/taler-magnet-bank/tests/api.rs @@ -14,41 +14,55 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use jiff::{Timestamp, Zoned}; -use sqlx::PgPool; -use taler_api::{api::TalerRouter as _, auth::AuthMethod, subject::OutgoingSubject}; +use sqlx::{PgPool, Row as _, postgres::PgRow}; +use taler_api::{ + api::TalerRouter as _, + auth::AuthMethod, + db::TypeHelper as _, + subject::{IncomingSubject, OutgoingSubject}, +}; use taler_common::{ + api_common::EddsaPublicKey, api_revenue::RevenueConfig, + api_transfer::WireTransferConfig, api_wire::{OutgoingHistory, TransferState, WireConfig}, - types::{amount::amount, payto::payto}, + db::IncomingType, + types::{ + amount::amount, + payto::{PaytoURI, payto}, + }, }; use taler_magnet_bank::{ + FullHuPayto, api::MagnetApi, constants::CONFIG_SOURCE, - db::{self, TxOutKind}, + db::{self, AddIncomingResult, TxIn, TxOutKind}, magnet_api::types::TxStatus, magnet_payto, }; use taler_test_utils::{ Router, db::db_test_setup, - routine::{admin_add_incoming_routine, revenue_routine, routine_pagination, transfer_routine}, + routine::{ + Status, admin_add_incoming_routine, registration_routine, revenue_routine, + routine_pagination, transfer_routine, + }, server::TestServer, }; +static PAYTO: LazyLock<FullHuPayto> = + LazyLock::new(|| magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name")); +static ACCOUNT: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_payto()); + async fn setup() -> (Router, PgPool) { let (_, pool) = db_test_setup(CONFIG_SOURCE).await; - let api = Arc::new( - MagnetApi::start( - pool.clone(), - payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), - ) - .await, - ); + let api = Arc::new(MagnetApi::start(pool.clone(), ACCOUNT.clone()).await); let server = Router::new() .wire_gateway(api.clone(), AuthMethod::None) + .wire_transfer_gateway(api.clone()) .revenue(api, AuthMethod::None) .finalize(); @@ -63,6 +77,10 @@ async fn config() { .await .assert_ok_json::<WireConfig>(); server + .get("/taler-wire-transfer-gateway/config") + .await + .assert_ok_json::<WireTransferConfig>(); + server .get("/taler-revenue/config") .await .assert_ok_json::<RevenueConfig>(); @@ -100,9 +118,7 @@ async fn outgoing_history() { code: i as u64, amount: amount("EUR:10"), subject: "subject".to_owned(), - creditor: magnet_payto( - "payto://iban/HU30162000031000163100000000?receiver-name=name", - ), + creditor: PAYTO.clone(), value_date: now, status: TxStatus::Completed, }, @@ -119,21 +135,88 @@ async fn outgoing_history() { #[tokio::test] async fn admin_add_incoming() { let (server, _) = setup().await; - admin_add_incoming_routine( - &server, - &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), - true, - ) - .await; + admin_add_incoming_routine(&server, &ACCOUNT, true).await; } #[tokio::test] async fn revenue() { let (server, _) = setup().await; - revenue_routine( + revenue_routine(&server, &ACCOUNT, true).await; +} + +async fn check_in(pool: &PgPool) -> Vec<Status> { + sqlx::query( + " + SELECT pending_recurrent_in.authorization_pub IS NOT NULL, initiated_id IS NOT NULL, type, metadata + FROM tx_in + LEFT JOIN taler_in USING (tx_in_id) + LEFT JOIN pending_recurrent_in USING (tx_in_id) + LEFT JOIN bounced USING (tx_in_id) + ORDER BY tx_in.tx_in_id + ", + ) + .try_map(|r: PgRow| { + Ok( + if r.try_get_flag(0)? { + Status::Pending + } else if r.try_get_flag(1)? { + Status::Bounced + } else { + match r.try_get(2)? { + None => Status::Simple, + Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), + Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), + Some(e) => unreachable!("{e:?}") + } + } + ) + }) + .fetch_all(pool) + .await + .unwrap() +} + +pub async fn test_in(pool: &PgPool, key: EddsaPublicKey) { + let tx = TxIn { + code: rand::random_range(10..10000), + amount: amount("EUR:12"), + subject: String::new(), + debtor: PAYTO.clone(), + value_date: Zoned::now().date(), + status: TxStatus::Completed, + }; + let mut db = pool.acquire().await.unwrap(); + let reason = match db::register_tx_in( + &mut db, + &tx, + &Some(IncomingSubject::Map(key)), + &Timestamp::now(), + ) + .await + .unwrap() + { + AddIncomingResult::Success { .. } => return, + AddIncomingResult::ReservePubReuse => "reserve pub reuse", + AddIncomingResult::UnknownMapping => "unknown mapping", + AddIncomingResult::MappingReuse => "mapping reuse", + }; + db::register_bounce_tx_in(&mut db, &tx, reason, &Timestamp::now()) + .await + .unwrap(); +} + +#[tokio::test] +async fn registration() { + let (server, pool) = setup().await; + registration_routine( &server, - &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), - true, + &ACCOUNT, + || check_in(&pool), + |account_pub| { + let account_pub = account_pub.clone(); + let pool = &pool; + async move { test_in(pool, account_pub).await } + }, ) .await; }