diff --git a/.sqlx/query-5bee4ed5296667e4ca7e1a97aec28d30a470b8aee7b378ec9ca4e34de4faf349.json b/.sqlx/query-5bee4ed5296667e4ca7e1a97aec28d30a470b8aee7b378ec9ca4e34de4faf349.json new file mode 100644 index 0000000..4819874 --- /dev/null +++ b/.sqlx/query-5bee4ed5296667e4ca7e1a97aec28d30a470b8aee7b378ec9ca4e34de4faf349.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE comms_queue\n SET status = 'failed'::comms_status,\n attempts = max_attempts,\n last_error = $2,\n updated_at = NOW()\n WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "5bee4ed5296667e4ca7e1a97aec28d30a470b8aee7b378ec9ca4e34de4faf349" +} diff --git a/.sqlx/query-890aa92acdcb0fe2a3bf04d87e1f16a801d271da7cedc32fc42c2ef5b100faae.json b/.sqlx/query-890aa92acdcb0fe2a3bf04d87e1f16a801d271da7cedc32fc42c2ef5b100faae.json new file mode 100644 index 0000000..6d2742a --- /dev/null +++ b/.sqlx/query-890aa92acdcb0fe2a3bf04d87e1f16a801d271da7cedc32fc42c2ef5b100faae.json @@ -0,0 +1,158 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE comms_queue\n SET status = 'processing', updated_at = NOW()\n WHERE id IN (\n SELECT id FROM comms_queue\n WHERE attempts < max_attempts\n AND scheduled_for <= $1\n AND (\n status = 'pending'\n OR (status = 'processing'\n AND updated_at < $1 - INTERVAL '10 minutes')\n )\n ORDER BY scheduled_for ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n )\n RETURNING\n id, user_id,\n channel as \"channel: CommsChannel\",\n comms_type as \"comms_type: CommsType\",\n status as \"status: CommsStatus\",\n recipient, subject, body, metadata,\n attempts, max_attempts, last_error,\n created_at, updated_at, scheduled_for, processed_at", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "user_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "channel: CommsChannel", + "type_info": { + "Custom": { + "name": "comms_channel", + "kind": { + "Enum": [ + "email", + "discord", + "telegram", + "signal" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "comms_type: CommsType", + "type_info": { + "Custom": { + "name": "comms_type", + "kind": { + "Enum": [ + "welcome", + "email_verification", + "password_reset", + "email_update", + "account_deletion", + "admin_email", + "plc_operation", + "two_factor_code", + "channel_verification", + "passkey_recovery", + "legacy_login_alert", + "migration_verification", + "channel_verified" + ] + } + } + } + }, + { + "ordinal": 4, + "name": "status: CommsStatus", + "type_info": { + "Custom": { + "name": "comms_status", + "kind": { + "Enum": [ + "pending", + "processing", + "sent", + "failed" + ] + } + } + } + }, + { + "ordinal": 5, + "name": "recipient", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "subject", + "type_info": "Text" + }, + { + "ordinal": 7, + "name": "body", + "type_info": "Text" + }, + { + "ordinal": 8, + "name": "metadata", + "type_info": "Jsonb" + }, + { + "ordinal": 9, + "name": "attempts", + "type_info": "Int4" + }, + { + "ordinal": 10, + "name": "max_attempts", + "type_info": "Int4" + }, + { + "ordinal": 11, + "name": "last_error", + "type_info": "Text" + }, + { + "ordinal": 12, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 13, + "name": "updated_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 14, + "name": "scheduled_for", + "type_info": "Timestamptz" + }, + { + "ordinal": 15, + "name": "processed_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + true, + false, + true, + false, + false, + true, + false, + false, + false, + true + ] + }, + "hash": "890aa92acdcb0fe2a3bf04d87e1f16a801d271da7cedc32fc42c2ef5b100faae" +} diff --git a/crates/tranquil-comms/src/email/mod.rs b/crates/tranquil-comms/src/email/mod.rs new file mode 100644 index 0000000..3c57238 --- /dev/null +++ b/crates/tranquil-comms/src/email/mod.rs @@ -0,0 +1,194 @@ +pub mod dkim; +pub mod message; +mod mx; +pub mod transport; +pub mod types; + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use hickory_resolver::TokioAsyncResolver; +use lettre::message::Mailbox; +use lettre::transport::smtp::AsyncSmtpTransport; +use lettre::transport::smtp::PoolConfig; +use lettre::transport::smtp::authentication::Credentials; +use lettre::transport::smtp::extension::ClientId; +use tokio::sync::Semaphore; +use tracing::{info, warn}; + +pub use self::dkim::DkimSigner; +pub use self::transport::SendMode; +use self::types::{ + DkimKeyPath, DkimSelector, EmailDomain, HeloName, SmtpHost, SmtpPassword, SmtpPort, + SmtpUsername, TlsMode, +}; +use crate::sender::{CommsSender, SendError}; +use crate::types::{CommsChannel, QueuedComms}; + +pub struct EmailSender { + from: Mailbox, + mode: SendMode, + dkim: Option, +} + +impl EmailSender { + pub fn new(from: Mailbox, mode: SendMode, dkim: Option) -> Self { + Self { from, mode, dkim } + } + + pub fn from_config(cfg: &tranquil_config::TranquilConfig) -> Result, SendError> { + let Some(from_address) = cfg.email.from_address.as_deref().filter(|s| !s.is_empty()) else { + info!("Email sender disabled: MAIL_FROM_ADDRESS unset"); + return Ok(None); + }; + let from = build_from(&cfg.email.from_name, from_address)?; + let dkim = build_dkim(&cfg.email.dkim)?; + let mode = match cfg + .email + .smarthost + .host + .as_deref() + .filter(|h| !h.is_empty()) + { + Some(host) => build_smarthost(cfg, host)?, + None => build_direct_mx(cfg)?, + }; + info!(?mode, dkim = dkim.is_some(), "Email sender initialized"); + Ok(Some(Self { from, mode, dkim })) + } +} + +fn config_invalid(field: &str, error: impl std::fmt::Display) -> SendError { + SendError::ConfigInvalid(format!("{field}: {error}")) +} + +fn build_from(from_name: &str, from_address: &str) -> Result { + let raw = match from_name.is_empty() { + true => from_address.to_string(), + false => format!("\"{}\" <{}>", from_name.replace('"', "'"), from_address), + }; + raw.parse::() + .map_err(|e| config_invalid("MAIL_FROM_ADDRESS / MAIL_FROM_NAME", e)) +} + +fn build_smarthost( + cfg: &tranquil_config::TranquilConfig, + host_raw: &str, +) -> Result { + let host = SmtpHost::parse(host_raw).map_err(|e| config_invalid("MAIL_SMARTHOST_HOST", e))?; + let port = SmtpPort::parse(cfg.email.smarthost.port) + .map_err(|e| config_invalid("MAIL_SMARTHOST_PORT", e))?; + let tls = TlsMode::parse(&cfg.email.smarthost.tls) + .map_err(|e| config_invalid("MAIL_SMARTHOST_TLS", e))?; + let helo = resolve_helo(cfg)?; + let pool = PoolConfig::new() + .max_size(cfg.email.smarthost.pool_size) + .idle_timeout(Duration::from_secs(60)); + let command_timeout = Duration::from_secs(cfg.email.smarthost.command_timeout_secs); + let total_timeout = Duration::from_secs(cfg.email.smarthost.total_timeout_secs); + + let builder = match tls { + TlsMode::Implicit => AsyncSmtpTransport::::relay(host.as_str()) + .map_err(|e| config_invalid("smarthost TLS setup", e))?, + TlsMode::Starttls => { + AsyncSmtpTransport::::starttls_relay(host.as_str()) + .map_err(|e| config_invalid("smarthost TLS setup", e))? + } + TlsMode::None => { + AsyncSmtpTransport::::builder_dangerous(host.as_str()) + } + }; + let builder = builder + .port(port.as_u16()) + .hello_name(ClientId::Domain(helo.into_inner())) + .timeout(Some(command_timeout)) + .pool_config(pool); + let builder = match ( + cfg.email.smarthost.username.as_deref(), + cfg.email.smarthost.password.as_deref(), + ) { + (Some(u), Some(p)) => { + let username = + SmtpUsername::parse(u).map_err(|e| config_invalid("MAIL_SMARTHOST_USERNAME", e))?; + let password = + SmtpPassword::parse(p).map_err(|e| config_invalid("MAIL_SMARTHOST_PASSWORD", e))?; + builder.credentials(Credentials::new( + username.into_inner(), + password.expose().to_string(), + )) + } + _ => builder, + }; + Ok(SendMode::Smarthost { + transport: Box::new(builder.build()), + total_timeout, + }) +} + +fn build_direct_mx(cfg: &tranquil_config::TranquilConfig) -> Result { + let helo = resolve_helo(cfg)?; + let resolver = TokioAsyncResolver::tokio_from_system_conf() + .map(Arc::new) + .map_err(|e| config_invalid("system DNS configuration", e))?; + let max_concurrent = cfg.email.direct_mx.max_concurrent_sends.max(1); + Ok(SendMode::DirectMx { + resolver, + helo, + command_timeout: Duration::from_secs(cfg.email.direct_mx.command_timeout_secs), + total_timeout: Duration::from_secs(cfg.email.direct_mx.total_timeout_secs), + require_tls: cfg.email.direct_mx.require_tls, + inflight: Arc::new(Semaphore::new(max_concurrent)), + }) +} + +fn resolve_helo(cfg: &tranquil_config::TranquilConfig) -> Result { + let raw = cfg + .email + .helo_name + .clone() + .unwrap_or_else(|| cfg.server.hostname_without_port().to_string()); + HeloName::parse(&raw).map_err(|e| config_invalid(&format!("HELO name {raw:?}"), e)) +} + +fn build_dkim(cfg: &tranquil_config::DkimConfig) -> Result, SendError> { + let selector = match cfg.selector.as_deref() { + Some(s) => s, + None => return Ok(None), + }; + let domain = cfg + .domain + .as_deref() + .ok_or_else(|| SendError::DkimSign("MAIL_DKIM_DOMAIN required when selector set".into()))?; + let key_path = cfg.private_key_path.as_deref().ok_or_else(|| { + SendError::DkimSign("MAIL_DKIM_KEY_PATH required when selector set".into()) + })?; + let selector = DkimSelector::parse(selector) + .map_err(|e| SendError::DkimSign(format!("invalid DKIM selector: {e}")))?; + let domain = EmailDomain::parse(domain) + .map_err(|e| SendError::DkimSign(format!("invalid DKIM domain: {e}")))?; + let path = DkimKeyPath::parse(key_path) + .map_err(|e| SendError::DkimSign(format!("DKIM key path invalid: {e}")))?; + DkimSigner::load(selector, domain, path).map(Some) +} + +#[async_trait] +impl CommsSender for EmailSender { + fn channel(&self) -> CommsChannel { + CommsChannel::Email + } + + async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { + let mut message = message::build(&self.from, notification)?; + if let Some(signer) = &self.dkim { + signer.sign(&mut message); + } + match transport::dispatch(&self.mode, message).await { + Ok(()) => Ok(()), + Err(e) => { + warn!(comms_id = %notification.id, error = %e, "SMTP send failed"); + Err(e) + } + } + } +} diff --git a/crates/tranquil-comms/src/lib.rs b/crates/tranquil-comms/src/lib.rs index 5c5824e..425c9b5 100644 --- a/crates/tranquil-comms/src/lib.rs +++ b/crates/tranquil-comms/src/lib.rs @@ -1,13 +1,15 @@ +pub mod email; mod locale; mod sender; mod types; +pub use email::EmailSender; pub use locale::{ DEFAULT_LOCALE, NotificationStrings, VALID_LOCALES, format_message, get_strings, validate_locale, }; pub use sender::{ - CommsSender, DiscordSender, EmailSender, SendError, SignalSender, TelegramSender, - is_valid_phone_number, is_valid_signal_username, mime_encode_header, sanitize_header_value, + CommsSender, DiscordSender, SendError, SignalSender, TelegramSender, is_valid_phone_number, + is_valid_signal_username, }; pub use types::{CommsChannel, CommsStatus, CommsType, NewComms, QueuedComms}; diff --git a/crates/tranquil-comms/src/sender.rs b/crates/tranquil-comms/src/sender.rs index cf5a9bc..ec08797 100644 --- a/crates/tranquil-comms/src/sender.rs +++ b/crates/tranquil-comms/src/sender.rs @@ -1,11 +1,7 @@ use async_trait::async_trait; -use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; use reqwest::Client; use serde_json::json; -use std::process::Stdio; use std::time::Duration; -use tokio::io::AsyncWriteExt; -use tokio::process::Command; use super::types::{CommsChannel, QueuedComms}; @@ -21,25 +17,51 @@ pub trait CommsSender: Send + Sync { #[derive(Debug, thiserror::Error)] pub enum SendError { - #[error("Failed to spawn {command}: {source}")] - ProcessSpawn { - command: String, - source: std::io::Error, - }, - #[error("{command} exited with non-zero status: {detail}")] - ProcessFailed { command: String, detail: String }, #[error("Channel not configured: {0:?}")] NotConfigured(CommsChannel), - #[error("External service error: {0}")] - ExternalService(String), + #[error("Email configuration invalid: {0}")] + ConfigInvalid(String), #[error("Invalid recipient format: {0}")] InvalidRecipient(String), + #[error("Message construction failed: {0}")] + MessageBuild(String), + #[error("transient DNS lookup failure: {0}")] + DnsTransient(String), + #[error("permanent DNS lookup failure: {0}")] + DnsPermanent(String), + #[error("SMTP transient error: {0}")] + SmtpTransient(String), + #[error("SMTP permanent error: {0}")] + SmtpPermanent(String), + #[error("DKIM signing failed: {0}")] + DkimSign(String), + #[error("External service error: {0}")] + ExternalService(String), #[error("Request timeout")] Timeout, #[error("Max retries exceeded: {0}")] MaxRetriesExceeded(String), } +impl SendError { + pub fn is_permanent(&self) -> bool { + match self { + Self::SmtpPermanent(_) + | Self::DnsPermanent(_) + | Self::InvalidRecipient(_) + | Self::MessageBuild(_) + | Self::DkimSign(_) + | Self::ConfigInvalid(_) => true, + Self::SmtpTransient(_) + | Self::DnsTransient(_) + | Self::Timeout + | Self::ExternalService(_) + | Self::MaxRetriesExceeded(_) + | Self::NotConfigured(_) => false, + } + } +} + fn create_http_client() -> Client { Client::builder() .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) @@ -100,19 +122,6 @@ where )) } -pub fn sanitize_header_value(value: &str) -> String { - value.replace(['\r', '\n'], " ").trim().to_string() -} - -pub fn mime_encode_header(value: &str) -> String { - if value.is_ascii() { - sanitize_header_value(value) - } else { - let sanitized = sanitize_header_value(value); - format!("=?UTF-8?B?{}?=", BASE64.encode(sanitized.as_bytes())) - } -} - pub fn escape_html(text: &str) -> String { text.replace('&', "&") .replace('<', "<") @@ -135,93 +144,6 @@ pub fn is_valid_signal_username(username: &str) -> bool { tranquil_signal::SignalUsername::parse(username).is_ok() } -pub struct EmailSender { - from_address: String, - from_name: String, - sendmail_path: String, -} - -impl EmailSender { - pub fn new(from_address: String, from_name: String, sendmail_path: String) -> Self { - Self { - from_address, - from_name, - sendmail_path, - } - } - - pub fn from_config(cfg: &tranquil_config::TranquilConfig) -> Option { - let from_address = cfg.email.from_address.clone()?; - let from_name = cfg.email.from_name.clone(); - let sendmail_path = cfg.email.sendmail_path.clone(); - Some(Self::new(from_address, from_name, sendmail_path)) - } - - pub fn format_email(&self, notification: &QueuedComms) -> String { - let subject = mime_encode_header(notification.subject.as_deref().unwrap_or("Notification")); - let recipient = sanitize_header_value(¬ification.recipient); - let from_header = if self.from_name.is_empty() { - self.from_address.clone() - } else { - format!( - "{} <{}>", - sanitize_header_value(&self.from_name), - self.from_address - ) - }; - format!( - "From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}", - from_header, recipient, subject, notification.body - ) - } -} - -#[async_trait] -impl CommsSender for EmailSender { - fn channel(&self) -> CommsChannel { - CommsChannel::Email - } - - async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { - let email_content = self.format_email(notification); - let mut child = Command::new(&self.sendmail_path) - .arg("-t") - .arg("-oi") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .map_err(|e| SendError::ProcessSpawn { - command: self.sendmail_path.clone(), - source: e, - })?; - if let Some(mut stdin) = child.stdin.take() { - stdin - .write_all(email_content.as_bytes()) - .await - .map_err(|e| SendError::ProcessSpawn { - command: self.sendmail_path.clone(), - source: e, - })?; - } - let output = child - .wait_with_output() - .await - .map_err(|e| SendError::ProcessSpawn { - command: self.sendmail_path.clone(), - source: e, - })?; - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(SendError::ProcessFailed { - command: self.sendmail_path.clone(), - detail: stderr.to_string(), - }); - } - Ok(()) - } -} - const DISCORD_API_BASE: &str = "https://discord.com/api/v10"; #[derive(Clone)] @@ -610,3 +532,28 @@ impl CommsSender for SignalSender { )) } } + +#[cfg(test)] +mod is_permanent_matrix { + use super::{CommsChannel, SendError}; + + #[test] + fn permanent_variants_are_permanent() { + assert!(SendError::SmtpPermanent("x".into()).is_permanent()); + assert!(SendError::DnsPermanent("x".into()).is_permanent()); + assert!(SendError::InvalidRecipient("x".into()).is_permanent()); + assert!(SendError::MessageBuild("x".into()).is_permanent()); + assert!(SendError::DkimSign("x".into()).is_permanent()); + assert!(SendError::ConfigInvalid("x".into()).is_permanent()); + } + + #[test] + fn transient_variants_are_not_permanent() { + assert!(!SendError::SmtpTransient("x".into()).is_permanent()); + assert!(!SendError::DnsTransient("x".into()).is_permanent()); + assert!(!SendError::Timeout.is_permanent()); + assert!(!SendError::ExternalService("x".into()).is_permanent()); + assert!(!SendError::MaxRetriesExceeded("x".into()).is_permanent()); + assert!(!SendError::NotConfigured(CommsChannel::Email).is_permanent()); + } +} diff --git a/crates/tranquil-db-traits/src/infra.rs b/crates/tranquil-db-traits/src/infra.rs index c9879d6..eb337df 100644 --- a/crates/tranquil-db-traits/src/infra.rs +++ b/crates/tranquil-db-traits/src/infra.rs @@ -244,6 +244,8 @@ pub trait InfraRepository: Send + Sync { async fn mark_comms_failed(&self, id: Uuid, error: &str) -> Result<(), DbError>; + async fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), DbError>; + async fn create_invite_code( &self, code: &str, diff --git a/crates/tranquil-db/src/postgres/infra.rs b/crates/tranquil-db/src/postgres/infra.rs index 5045be5..adac68d 100644 --- a/crates/tranquil-db/src/postgres/infra.rs +++ b/crates/tranquil-db/src/postgres/infra.rs @@ -65,9 +65,13 @@ impl InfraRepository for PostgresInfraRepository { SET status = 'processing', updated_at = NOW() WHERE id IN ( SELECT id FROM comms_queue - WHERE status = 'pending' + WHERE attempts < max_attempts AND scheduled_for <= $1 - AND attempts < max_attempts + AND ( + status = 'pending' + OR (status = 'processing' + AND updated_at < $1 - INTERVAL '10 minutes') + ) ORDER BY scheduled_for ASC LIMIT $2 FOR UPDATE SKIP LOCKED @@ -127,6 +131,24 @@ impl InfraRepository for PostgresInfraRepository { Ok(()) } + async fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), DbError> { + sqlx::query!( + r#"UPDATE comms_queue + SET status = 'failed'::comms_status, + attempts = max_attempts, + last_error = $2, + updated_at = NOW() + WHERE id = $1"#, + id, + error + ) + .execute(&self.pool) + .await + .map_err(map_sqlx_error)?; + + Ok(()) + } + async fn create_invite_code( &self, code: &str, diff --git a/crates/tranquil-pds/src/comms/mod.rs b/crates/tranquil-pds/src/comms/mod.rs index 981f01f..2342773 100644 --- a/crates/tranquil-pds/src/comms/mod.rs +++ b/crates/tranquil-pds/src/comms/mod.rs @@ -4,7 +4,7 @@ pub use tranquil_comms::{ CommsChannel, CommsSender, CommsStatus, CommsType, DEFAULT_LOCALE, DiscordSender, EmailSender, NewComms, NotificationStrings, QueuedComms, SendError, SignalSender, TelegramSender, VALID_LOCALES, format_message, get_strings, is_valid_phone_number, is_valid_signal_username, - mime_encode_header, sanitize_header_value, validate_locale, + validate_locale, }; pub use service::{CommsService, repo as comms_repo, resolve_delivery_channel}; diff --git a/crates/tranquil-pds/src/comms/service.rs b/crates/tranquil-pds/src/comms/service.rs index a983c02..d9f23b2 100644 --- a/crates/tranquil-pds/src/comms/service.rs +++ b/crates/tranquil-pds/src/comms/service.rs @@ -149,13 +149,19 @@ impl CommsService { } } Err(e) => { + let permanent = e.is_permanent(); let error_msg = e.to_string(); warn!( comms_id = %comms_id, error = %error_msg, + permanent, "Failed to send comms" ); - if let Err(db_err) = self.mark_failed(comms_id, &error_msg).await { + let db_result = match permanent { + true => self.mark_failed_permanent(comms_id, &error_msg).await, + false => self.mark_failed(comms_id, &error_msg).await, + }; + if let Err(db_err) = db_result { error!( comms_id = %comms_id, error = %db_err, @@ -173,6 +179,14 @@ impl CommsService { async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), tranquil_db_traits::DbError> { self.infra_repo.mark_comms_failed(id, error).await } + + async fn mark_failed_permanent( + &self, + id: Uuid, + error: &str, + ) -> Result<(), tranquil_db_traits::DbError> { + self.infra_repo.mark_comms_failed_permanent(id, error).await + } } struct ResolvedRecipient { diff --git a/crates/tranquil-pds/tests/security_fixes.rs b/crates/tranquil-pds/tests/security_fixes.rs index 6db6874..38d2298 100644 --- a/crates/tranquil-pds/tests/security_fixes.rs +++ b/crates/tranquil-pds/tests/security_fixes.rs @@ -1,43 +1,7 @@ mod common; -use tranquil_pds::comms::{ - SendError, is_valid_phone_number, is_valid_signal_username, sanitize_header_value, -}; +use tranquil_pds::comms::{SendError, is_valid_phone_number, is_valid_signal_username}; use tranquil_pds::image::{ImageError, ImageProcessor}; -#[test] -fn test_header_injection_sanitization() { - let malicious = "Injected\r\nBcc: attacker@evil.com"; - let sanitized = sanitize_header_value(malicious); - assert!(!sanitized.contains('\r') && !sanitized.contains('\n')); - assert!(sanitized.contains("Injected") && sanitized.contains("Bcc:")); - - let normal = "Normal Subject Line"; - assert_eq!(sanitize_header_value(normal), "Normal Subject Line"); - - let padded = " Subject "; - assert_eq!(sanitize_header_value(padded), "Subject"); - - let multi_newline = "Line1\r\nLine2\nLine3\rLine4"; - let sanitized = sanitize_header_value(multi_newline); - assert!(!sanitized.contains('\r') && !sanitized.contains('\n')); - assert!(sanitized.contains("Line1") && sanitized.contains("Line4")); - - let header_injection = "Normal Subject\r\nBcc: attacker@evil.com\r\nX-Injected: value"; - let sanitized = sanitize_header_value(header_injection); - assert_eq!(sanitized.split("\r\n").count(), 1); - assert!( - sanitized.contains("Normal Subject") - && sanitized.contains("Bcc:") - && sanitized.contains("X-Injected:") - ); - - let with_null = "client\0id"; - assert!(sanitize_header_value(with_null).contains("client")); - - let long_input = "x".repeat(10000); - assert!(!sanitize_header_value(&long_input).is_empty()); -} - #[test] fn test_phone_number_validation() { assert!(is_valid_phone_number("+1234567890")); diff --git a/crates/tranquil-store/src/metastore/client.rs b/crates/tranquil-store/src/metastore/client.rs index 72fc61f..42bb1ce 100644 --- a/crates/tranquil-store/src/metastore/client.rs +++ b/crates/tranquil-store/src/metastore/client.rs @@ -1860,6 +1860,18 @@ impl tranquil_db_traits::InfraRepository for MetastoreCl recv(rx).await } + async fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), DbError> { + let (tx, rx) = oneshot::channel(); + self.pool.send(MetastoreRequest::Infra( + InfraRequest::MarkCommsFailedPermanent { + id, + error: error.to_owned(), + tx, + }, + ))?; + recv(rx).await + } + async fn create_invite_code( &self, code: &str, diff --git a/crates/tranquil-store/src/metastore/handler.rs b/crates/tranquil-store/src/metastore/handler.rs index b8e98d1..118cdd3 100644 --- a/crates/tranquil-store/src/metastore/handler.rs +++ b/crates/tranquil-store/src/metastore/handler.rs @@ -1789,6 +1789,11 @@ pub enum InfraRequest { error: String, tx: Tx<()>, }, + MarkCommsFailedPermanent { + id: Uuid, + error: String, + tx: Tx<()>, + }, CreateInviteCode { code: String, use_count: i32, @@ -3888,6 +3893,14 @@ fn dispatch_infra(state: &HandlerState, req: InfraRequest) { .map_err(metastore_to_db); let _ = tx.send(result); } + InfraRequest::MarkCommsFailedPermanent { id, error, tx } => { + let result = state + .metastore + .infra_ops() + .mark_comms_failed_permanent(id, &error) + .map_err(metastore_to_db); + let _ = tx.send(result); + } InfraRequest::CreateInviteCode { code, use_count, diff --git a/crates/tranquil-store/src/metastore/infra_ops.rs b/crates/tranquil-store/src/metastore/infra_ops.rs index 800cbf1..cef98dd 100644 --- a/crates/tranquil-store/src/metastore/infra_ops.rs +++ b/crates/tranquil-store/src/metastore/infra_ops.rs @@ -247,7 +247,6 @@ impl InfraOps { val.status = status_to_u8(CommsStatus::Sent); val.sent_at_ms = Some(Utc::now().timestamp_millis()); - val.attempts = val.attempts.saturating_add(1); let mut batch = self.db.batch(); batch.insert(&self.infra, key.as_slice(), val.serialize()); @@ -272,9 +271,46 @@ impl InfraOps { )? .ok_or(MetastoreError::InvalidInput("comms entry not found"))?; + let next_attempts = val.attempts.saturating_add(1); + let exhausted = next_attempts >= val.max_attempts; + let next_status = match exhausted { + true => CommsStatus::Failed, + false => CommsStatus::Pending, + }; + let now_ms = Utc::now().timestamp_millis(); + let backoff_ms = i64::from(next_attempts).saturating_mul(60_000); + + val.status = status_to_u8(next_status); + val.error_message = Some(error.to_owned()); + val.attempts = next_attempts; + val.scheduled_for_ms = now_ms.saturating_add(backoff_ms); + + let mut batch = self.db.batch(); + batch.insert(&self.infra, key.as_slice(), val.serialize()); + + if let Some((hk, mut hv)) = + self.find_history_entry(val.user_id.unwrap_or(Uuid::nil()), val.id)? + { + hv.status = status_to_u8(next_status); + batch.insert(&self.infra, hk.as_slice(), hv.serialize()); + } + + batch.commit().map_err(MetastoreError::Fjall) + } + + pub fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), MetastoreError> { + let key = comms_queue_key(id); + let mut val: QueuedCommsValue = point_lookup( + &self.infra, + key.as_slice(), + QueuedCommsValue::deserialize, + "corrupt comms queue entry", + )? + .ok_or(MetastoreError::InvalidInput("comms entry not found"))?; + val.status = status_to_u8(CommsStatus::Failed); val.error_message = Some(error.to_owned()); - val.attempts = val.attempts.saturating_add(1); + val.attempts = val.max_attempts; let mut batch = self.db.batch(); batch.insert(&self.infra, key.as_slice(), val.serialize());