taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 7de57eb035e1759307b3f170f89b12b145390b31
parent 022ece563e170130a5910c0465cb8e8ec5c74f93
Author: Antoine A <>
Date:   Fri,  3 Apr 2026 10:23:12 +0200

apns: improve worker logic

Diffstat:
Mcommon/taler-common/src/lib.rs | 11++++++-----
Mtaler-apns-relay/src/apns.rs | 2+-
Mtaler-apns-relay/src/db.rs | 2+-
Mtaler-apns-relay/src/worker.rs | 142++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
4 files changed, 91 insertions(+), 66 deletions(-)

diff --git a/common/taler-common/src/lib.rs b/common/taler-common/src/lib.rs @@ -17,6 +17,7 @@ use std::{path::PathBuf, time::Duration}; use config::{Config, parser::ConfigSource}; +use jiff::SignedDuration; use tracing::error; use tracing_subscriber::util::SubscriberInitExt; @@ -86,12 +87,12 @@ pub struct ExpoBackoffDecorr { } impl ExpoBackoffDecorr { - pub fn new(base: u32, max: u32, factor: f32) -> Self { + pub fn new(base: Duration, max: Duration, factor: f32) -> Self { Self { - base, - max, + base: base.as_millis() as u32, + max: max.as_millis() as u32, factor, - sleep: base, + sleep: base.as_millis() as u32, } } @@ -108,6 +109,6 @@ impl ExpoBackoffDecorr { impl Default for ExpoBackoffDecorr { fn default() -> Self { - Self::new(400, 30 * 1000, 2.5) + Self::new(Duration::from_millis(400), Duration::from_secs(30), 2.5) } } diff --git a/taler-apns-relay/src/apns.rs b/taler-apns-relay/src/apns.rs @@ -43,7 +43,7 @@ pub struct ApnsErrorBody { pub timestamp: Option<u64>, } -#[derive(Debug, Clone, PartialEq, Eq, strum_macros::AsRefStr, strum_macros::Display)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, strum_macros::AsRefStr, strum_macros::Display)] pub enum Reason { BadCollapseId, BadDeviceToken, diff --git a/taler-apns-relay/src/db.rs b/taler-apns-relay/src/db.rs @@ -126,7 +126,7 @@ pub mod test { assert_eq!(all_registrations(&db).await.unwrap(), &[token2]); // Idempotent - unregister(&db, &token1, &now).await.unwrap(); + unregister(&db, token1, &now).await.unwrap(); assert_eq!(all_registrations(&db).await.unwrap(), &[token2]); // Skip reregistered diff --git a/taler-apns-relay/src/worker.rs b/taler-apns-relay/src/worker.rs @@ -20,7 +20,7 @@ use jiff::Timestamp; use sqlx::PgPool; use taler_common::ExpoBackoffDecorr; use taler_common::config::Config; -use tracing::{error, info}; +use tracing::{error, info, trace, warn}; use crate::{ apns::{ApnsError, Client, Reason}, @@ -36,71 +36,95 @@ pub enum WorkerError { Apns(#[from] ApnsError), } +const NB_RETRY: usize = 5; + async fn wakeup(pool: &PgPool, client: &mut Client) -> Result<(), WorkerError> { let tokens = db::all_registrations(pool).await?; info!(target: "worker", "send notification to {} devices", tokens.len()); + // Wait at least 15 minutes before retrying API calls as per: https://developer.apple.com/documentation/usernotifications/sending-notification-requests-to-apns#Follow-best-practices-while-sending-push-notifications-with-APNs + // After 15 minutes, you can retry JSON payloads that receive response status codes that begin with 5XX. + // While retrying, you may employ a back-off technique. Most notifications with the status code 4XX can + // be retried after you fix the error noted in the reason field. Don’t retry notification responses with + // the error code BadDeviceToken, DeviceTokenNotForTopic, Forbidden, ExpiredToken, Unregistered, or PayloadTooLarge. + // You can retry with a delay, if you get the error code TooManyRequests. + let mut jitter = ExpoBackoffDecorr::new(Duration::from_mins(15), Duration::from_hours(1), 2.5); + + // TODO paginate tokens + // TODO send in batches + for token in tokens { - let res = client.send(&token).await; - if let Err(ApnsError::Err { reason, timestamp }) = &res { - match reason { - // Fatal error - Reason::BadCollapseId - | Reason::BadDeviceToken - | Reason::BadExpirationDate - | Reason::BadMessageId - | Reason::BadPriority - | Reason::BadTopic - | Reason::DuplicateHeaders - | Reason::InvalidPushType - | Reason::MissingDeviceToken - | Reason::MissingTopic - | Reason::PayloadEmpty - | Reason::BadPath - | Reason::MethodNotAllowed => { - tracing::error!(target: "worker", "fatal error the service is broken: {}",res.unwrap_err()); - std::process::exit(9); - } - // Config error - Reason::TopicDisallowed - | Reason::BadCertificate - | Reason::BadCertificateEnvironment - | Reason::ExpiredProviderToken - | Reason::Forbidden - | Reason::InvalidProviderToken - | Reason::MissingProviderToken - | Reason::UnrelatedKeyIdInToken - | Reason::BadEnvironmentKeyIdInToken - | Reason::PayloadTooLarge => { - tracing::error!(target: "worker", "config error, check the configuration: {}",res.unwrap_err()); - std::process::exit(9); - } - // Unregister - Reason::DeviceTokenNotForTopic | Reason::ExpiredToken | Reason::Unregistered => { - db::unregister( - pool, - &token, - &timestamp - .and_then(|s| Timestamp::from_second(s as i64).ok()) - .unwrap_or_else(Timestamp::now), - ) - .await?; - } - // Wait before retry - Reason::IdleTimeout - | Reason::TooManyProviderTokenUpdates - | Reason::TooManyRequests => { - tokio::time::sleep(Duration::from_mins(15)).await; - } - // Restart loop - Reason::InternalServerError | Reason::ServiceUnavailable | Reason::Shutdown => { - tokio::time::sleep(Duration::from_mins(15)).await; - res?; + trace!(target: "worker", "send background notification to {token}"); + let mut attempt = 0; + while let Err(e) = client.send(&token).await { + trace!(target: "worker", "notification to {token} failed {e}"); + if let ApnsError::Err { reason, timestamp } = e { + match reason { + // Fatal error + Reason::BadCollapseId + | Reason::BadDeviceToken + | Reason::BadExpirationDate + | Reason::BadMessageId + | Reason::BadPriority + | Reason::BadTopic + | Reason::DuplicateHeaders + | Reason::InvalidPushType + | Reason::MissingDeviceToken + | Reason::MissingTopic + | Reason::PayloadEmpty + | Reason::BadPath + | Reason::MethodNotAllowed => { + error!(target: "worker", "fatal error the service is broken: {e}"); + std::process::exit(9); + } + // Config error + Reason::TopicDisallowed + | Reason::BadCertificate + | Reason::BadCertificateEnvironment + | Reason::ExpiredProviderToken + | Reason::Forbidden + | Reason::InvalidProviderToken + | Reason::MissingProviderToken + | Reason::UnrelatedKeyIdInToken + | Reason::BadEnvironmentKeyIdInToken + | Reason::PayloadTooLarge => { + error!(target: "worker", "config error, check the configuration: {e}"); + std::process::exit(9); + } + // Unregister + Reason::DeviceTokenNotForTopic + | Reason::ExpiredToken + | Reason::Unregistered => { + db::unregister( + pool, + &token, + &timestamp + .and_then(|s| Timestamp::from_second(s as i64).ok()) + .unwrap_or_else(Timestamp::now), + ) + .await?; + break; + } + // Wait before retry + Reason::IdleTimeout + | Reason::TooManyProviderTokenUpdates + | Reason::TooManyRequests + | Reason::InternalServerError + | Reason::ServiceUnavailable + | Reason::Shutdown => {} } } - } else { - res?; + + attempt += 1; + if attempt >= NB_RETRY { + error!(target: "worker", "notification to {token} failed more than {NB_RETRY} times, stopping worker"); + return Err(WorkerError::Apns(e)); + } else { + warn!(target: "worker", "notification to {token} failed {e}, retrying after back-off"); + tokio::time::sleep(jitter.backoff()).await; + } } + info!(target: "trace", "notification to {token} succeeded"); } Ok(()) @@ -118,7 +142,7 @@ pub async fn run(cfg: &Config, pool: &PgPool, transient: bool) -> anyhow::Result info!(target: "worker", "running at initialisation"); loop { - while let Err(e) = wakeup(pool, &mut client).await { + while let Err(WorkerError::Db(e)) = wakeup(pool, &mut client).await { error!(target: "worker", "{e}"); tokio::time::sleep(jitter.backoff()).await; }