feat(comms): EmailSender, permanent/transient routing

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-04-30 12:27:10 +03:00
committed by Tangled
parent 2462d0ab3b
commit eee6fb9ff4
13 changed files with 537 additions and 158 deletions

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE comms_queue\n SET status = 'failed'::comms_status,\n attempts = max_attempts,\n last_error = $2,\n updated_at = NOW()\n WHERE id = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": []
},
"hash": "5bee4ed5296667e4ca7e1a97aec28d30a470b8aee7b378ec9ca4e34de4faf349"
}

View File

@@ -0,0 +1,158 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE comms_queue\n SET status = 'processing', updated_at = NOW()\n WHERE id IN (\n SELECT id FROM comms_queue\n WHERE attempts < max_attempts\n AND scheduled_for <= $1\n AND (\n status = 'pending'\n OR (status = 'processing'\n AND updated_at < $1 - INTERVAL '10 minutes')\n )\n ORDER BY scheduled_for ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n )\n RETURNING\n id, user_id,\n channel as \"channel: CommsChannel\",\n comms_type as \"comms_type: CommsType\",\n status as \"status: CommsStatus\",\n recipient, subject, body, metadata,\n attempts, max_attempts, last_error,\n created_at, updated_at, scheduled_for, processed_at",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "user_id",
"type_info": "Uuid"
},
{
"ordinal": 2,
"name": "channel: CommsChannel",
"type_info": {
"Custom": {
"name": "comms_channel",
"kind": {
"Enum": [
"email",
"discord",
"telegram",
"signal"
]
}
}
}
},
{
"ordinal": 3,
"name": "comms_type: CommsType",
"type_info": {
"Custom": {
"name": "comms_type",
"kind": {
"Enum": [
"welcome",
"email_verification",
"password_reset",
"email_update",
"account_deletion",
"admin_email",
"plc_operation",
"two_factor_code",
"channel_verification",
"passkey_recovery",
"legacy_login_alert",
"migration_verification",
"channel_verified"
]
}
}
}
},
{
"ordinal": 4,
"name": "status: CommsStatus",
"type_info": {
"Custom": {
"name": "comms_status",
"kind": {
"Enum": [
"pending",
"processing",
"sent",
"failed"
]
}
}
}
},
{
"ordinal": 5,
"name": "recipient",
"type_info": "Text"
},
{
"ordinal": 6,
"name": "subject",
"type_info": "Text"
},
{
"ordinal": 7,
"name": "body",
"type_info": "Text"
},
{
"ordinal": 8,
"name": "metadata",
"type_info": "Jsonb"
},
{
"ordinal": 9,
"name": "attempts",
"type_info": "Int4"
},
{
"ordinal": 10,
"name": "max_attempts",
"type_info": "Int4"
},
{
"ordinal": 11,
"name": "last_error",
"type_info": "Text"
},
{
"ordinal": 12,
"name": "created_at",
"type_info": "Timestamptz"
},
{
"ordinal": 13,
"name": "updated_at",
"type_info": "Timestamptz"
},
{
"ordinal": 14,
"name": "scheduled_for",
"type_info": "Timestamptz"
},
{
"ordinal": 15,
"name": "processed_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Timestamptz",
"Int8"
]
},
"nullable": [
false,
false,
false,
false,
false,
false,
true,
false,
true,
false,
false,
true,
false,
false,
false,
true
]
},
"hash": "890aa92acdcb0fe2a3bf04d87e1f16a801d271da7cedc32fc42c2ef5b100faae"
}

View File

@@ -0,0 +1,194 @@
pub mod dkim;
pub mod message;
mod mx;
pub mod transport;
pub mod types;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use hickory_resolver::TokioAsyncResolver;
use lettre::message::Mailbox;
use lettre::transport::smtp::AsyncSmtpTransport;
use lettre::transport::smtp::PoolConfig;
use lettre::transport::smtp::authentication::Credentials;
use lettre::transport::smtp::extension::ClientId;
use tokio::sync::Semaphore;
use tracing::{info, warn};
pub use self::dkim::DkimSigner;
pub use self::transport::SendMode;
use self::types::{
DkimKeyPath, DkimSelector, EmailDomain, HeloName, SmtpHost, SmtpPassword, SmtpPort,
SmtpUsername, TlsMode,
};
use crate::sender::{CommsSender, SendError};
use crate::types::{CommsChannel, QueuedComms};
pub struct EmailSender {
from: Mailbox,
mode: SendMode,
dkim: Option<DkimSigner>,
}
impl EmailSender {
pub fn new(from: Mailbox, mode: SendMode, dkim: Option<DkimSigner>) -> Self {
Self { from, mode, dkim }
}
pub fn from_config(cfg: &tranquil_config::TranquilConfig) -> Result<Option<Self>, SendError> {
let Some(from_address) = cfg.email.from_address.as_deref().filter(|s| !s.is_empty()) else {
info!("Email sender disabled: MAIL_FROM_ADDRESS unset");
return Ok(None);
};
let from = build_from(&cfg.email.from_name, from_address)?;
let dkim = build_dkim(&cfg.email.dkim)?;
let mode = match cfg
.email
.smarthost
.host
.as_deref()
.filter(|h| !h.is_empty())
{
Some(host) => build_smarthost(cfg, host)?,
None => build_direct_mx(cfg)?,
};
info!(?mode, dkim = dkim.is_some(), "Email sender initialized");
Ok(Some(Self { from, mode, dkim }))
}
}
fn config_invalid(field: &str, error: impl std::fmt::Display) -> SendError {
SendError::ConfigInvalid(format!("{field}: {error}"))
}
fn build_from(from_name: &str, from_address: &str) -> Result<Mailbox, SendError> {
let raw = match from_name.is_empty() {
true => from_address.to_string(),
false => format!("\"{}\" <{}>", from_name.replace('"', "'"), from_address),
};
raw.parse::<Mailbox>()
.map_err(|e| config_invalid("MAIL_FROM_ADDRESS / MAIL_FROM_NAME", e))
}
fn build_smarthost(
cfg: &tranquil_config::TranquilConfig,
host_raw: &str,
) -> Result<SendMode, SendError> {
let host = SmtpHost::parse(host_raw).map_err(|e| config_invalid("MAIL_SMARTHOST_HOST", e))?;
let port = SmtpPort::parse(cfg.email.smarthost.port)
.map_err(|e| config_invalid("MAIL_SMARTHOST_PORT", e))?;
let tls = TlsMode::parse(&cfg.email.smarthost.tls)
.map_err(|e| config_invalid("MAIL_SMARTHOST_TLS", e))?;
let helo = resolve_helo(cfg)?;
let pool = PoolConfig::new()
.max_size(cfg.email.smarthost.pool_size)
.idle_timeout(Duration::from_secs(60));
let command_timeout = Duration::from_secs(cfg.email.smarthost.command_timeout_secs);
let total_timeout = Duration::from_secs(cfg.email.smarthost.total_timeout_secs);
let builder = match tls {
TlsMode::Implicit => AsyncSmtpTransport::<lettre::Tokio1Executor>::relay(host.as_str())
.map_err(|e| config_invalid("smarthost TLS setup", e))?,
TlsMode::Starttls => {
AsyncSmtpTransport::<lettre::Tokio1Executor>::starttls_relay(host.as_str())
.map_err(|e| config_invalid("smarthost TLS setup", e))?
}
TlsMode::None => {
AsyncSmtpTransport::<lettre::Tokio1Executor>::builder_dangerous(host.as_str())
}
};
let builder = builder
.port(port.as_u16())
.hello_name(ClientId::Domain(helo.into_inner()))
.timeout(Some(command_timeout))
.pool_config(pool);
let builder = match (
cfg.email.smarthost.username.as_deref(),
cfg.email.smarthost.password.as_deref(),
) {
(Some(u), Some(p)) => {
let username =
SmtpUsername::parse(u).map_err(|e| config_invalid("MAIL_SMARTHOST_USERNAME", e))?;
let password =
SmtpPassword::parse(p).map_err(|e| config_invalid("MAIL_SMARTHOST_PASSWORD", e))?;
builder.credentials(Credentials::new(
username.into_inner(),
password.expose().to_string(),
))
}
_ => builder,
};
Ok(SendMode::Smarthost {
transport: Box::new(builder.build()),
total_timeout,
})
}
fn build_direct_mx(cfg: &tranquil_config::TranquilConfig) -> Result<SendMode, SendError> {
let helo = resolve_helo(cfg)?;
let resolver = TokioAsyncResolver::tokio_from_system_conf()
.map(Arc::new)
.map_err(|e| config_invalid("system DNS configuration", e))?;
let max_concurrent = cfg.email.direct_mx.max_concurrent_sends.max(1);
Ok(SendMode::DirectMx {
resolver,
helo,
command_timeout: Duration::from_secs(cfg.email.direct_mx.command_timeout_secs),
total_timeout: Duration::from_secs(cfg.email.direct_mx.total_timeout_secs),
require_tls: cfg.email.direct_mx.require_tls,
inflight: Arc::new(Semaphore::new(max_concurrent)),
})
}
fn resolve_helo(cfg: &tranquil_config::TranquilConfig) -> Result<HeloName, SendError> {
let raw = cfg
.email
.helo_name
.clone()
.unwrap_or_else(|| cfg.server.hostname_without_port().to_string());
HeloName::parse(&raw).map_err(|e| config_invalid(&format!("HELO name {raw:?}"), e))
}
fn build_dkim(cfg: &tranquil_config::DkimConfig) -> Result<Option<DkimSigner>, SendError> {
let selector = match cfg.selector.as_deref() {
Some(s) => s,
None => return Ok(None),
};
let domain = cfg
.domain
.as_deref()
.ok_or_else(|| SendError::DkimSign("MAIL_DKIM_DOMAIN required when selector set".into()))?;
let key_path = cfg.private_key_path.as_deref().ok_or_else(|| {
SendError::DkimSign("MAIL_DKIM_KEY_PATH required when selector set".into())
})?;
let selector = DkimSelector::parse(selector)
.map_err(|e| SendError::DkimSign(format!("invalid DKIM selector: {e}")))?;
let domain = EmailDomain::parse(domain)
.map_err(|e| SendError::DkimSign(format!("invalid DKIM domain: {e}")))?;
let path = DkimKeyPath::parse(key_path)
.map_err(|e| SendError::DkimSign(format!("DKIM key path invalid: {e}")))?;
DkimSigner::load(selector, domain, path).map(Some)
}
#[async_trait]
impl CommsSender for EmailSender {
fn channel(&self) -> CommsChannel {
CommsChannel::Email
}
async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
let mut message = message::build(&self.from, notification)?;
if let Some(signer) = &self.dkim {
signer.sign(&mut message);
}
match transport::dispatch(&self.mode, message).await {
Ok(()) => Ok(()),
Err(e) => {
warn!(comms_id = %notification.id, error = %e, "SMTP send failed");
Err(e)
}
}
}
}

View File

@@ -1,13 +1,15 @@
pub mod email;
mod locale;
mod sender;
mod types;
pub use email::EmailSender;
pub use locale::{
DEFAULT_LOCALE, NotificationStrings, VALID_LOCALES, format_message, get_strings,
validate_locale,
};
pub use sender::{
CommsSender, DiscordSender, EmailSender, SendError, SignalSender, TelegramSender,
is_valid_phone_number, is_valid_signal_username, mime_encode_header, sanitize_header_value,
CommsSender, DiscordSender, SendError, SignalSender, TelegramSender, is_valid_phone_number,
is_valid_signal_username,
};
pub use types::{CommsChannel, CommsStatus, CommsType, NewComms, QueuedComms};

View File

@@ -1,11 +1,7 @@
use async_trait::async_trait;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use reqwest::Client;
use serde_json::json;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use super::types::{CommsChannel, QueuedComms};
@@ -21,25 +17,51 @@ pub trait CommsSender: Send + Sync {
#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("Failed to spawn {command}: {source}")]
ProcessSpawn {
command: String,
source: std::io::Error,
},
#[error("{command} exited with non-zero status: {detail}")]
ProcessFailed { command: String, detail: String },
#[error("Channel not configured: {0:?}")]
NotConfigured(CommsChannel),
#[error("External service error: {0}")]
ExternalService(String),
#[error("Email configuration invalid: {0}")]
ConfigInvalid(String),
#[error("Invalid recipient format: {0}")]
InvalidRecipient(String),
#[error("Message construction failed: {0}")]
MessageBuild(String),
#[error("transient DNS lookup failure: {0}")]
DnsTransient(String),
#[error("permanent DNS lookup failure: {0}")]
DnsPermanent(String),
#[error("SMTP transient error: {0}")]
SmtpTransient(String),
#[error("SMTP permanent error: {0}")]
SmtpPermanent(String),
#[error("DKIM signing failed: {0}")]
DkimSign(String),
#[error("External service error: {0}")]
ExternalService(String),
#[error("Request timeout")]
Timeout,
#[error("Max retries exceeded: {0}")]
MaxRetriesExceeded(String),
}
impl SendError {
pub fn is_permanent(&self) -> bool {
match self {
Self::SmtpPermanent(_)
| Self::DnsPermanent(_)
| Self::InvalidRecipient(_)
| Self::MessageBuild(_)
| Self::DkimSign(_)
| Self::ConfigInvalid(_) => true,
Self::SmtpTransient(_)
| Self::DnsTransient(_)
| Self::Timeout
| Self::ExternalService(_)
| Self::MaxRetriesExceeded(_)
| Self::NotConfigured(_) => false,
}
}
}
fn create_http_client() -> Client {
Client::builder()
.timeout(Duration::from_secs(HTTP_TIMEOUT_SECS))
@@ -100,19 +122,6 @@ where
))
}
pub fn sanitize_header_value(value: &str) -> String {
value.replace(['\r', '\n'], " ").trim().to_string()
}
pub fn mime_encode_header(value: &str) -> String {
if value.is_ascii() {
sanitize_header_value(value)
} else {
let sanitized = sanitize_header_value(value);
format!("=?UTF-8?B?{}?=", BASE64.encode(sanitized.as_bytes()))
}
}
pub fn escape_html(text: &str) -> String {
text.replace('&', "&amp;")
.replace('<', "&lt;")
@@ -135,93 +144,6 @@ pub fn is_valid_signal_username(username: &str) -> bool {
tranquil_signal::SignalUsername::parse(username).is_ok()
}
pub struct EmailSender {
from_address: String,
from_name: String,
sendmail_path: String,
}
impl EmailSender {
pub fn new(from_address: String, from_name: String, sendmail_path: String) -> Self {
Self {
from_address,
from_name,
sendmail_path,
}
}
pub fn from_config(cfg: &tranquil_config::TranquilConfig) -> Option<Self> {
let from_address = cfg.email.from_address.clone()?;
let from_name = cfg.email.from_name.clone();
let sendmail_path = cfg.email.sendmail_path.clone();
Some(Self::new(from_address, from_name, sendmail_path))
}
pub fn format_email(&self, notification: &QueuedComms) -> String {
let subject = mime_encode_header(notification.subject.as_deref().unwrap_or("Notification"));
let recipient = sanitize_header_value(&notification.recipient);
let from_header = if self.from_name.is_empty() {
self.from_address.clone()
} else {
format!(
"{} <{}>",
sanitize_header_value(&self.from_name),
self.from_address
)
};
format!(
"From: {}\r\nTo: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\nMIME-Version: 1.0\r\n\r\n{}",
from_header, recipient, subject, notification.body
)
}
}
#[async_trait]
impl CommsSender for EmailSender {
fn channel(&self) -> CommsChannel {
CommsChannel::Email
}
async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
let email_content = self.format_email(notification);
let mut child = Command::new(&self.sendmail_path)
.arg("-t")
.arg("-oi")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| SendError::ProcessSpawn {
command: self.sendmail_path.clone(),
source: e,
})?;
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(email_content.as_bytes())
.await
.map_err(|e| SendError::ProcessSpawn {
command: self.sendmail_path.clone(),
source: e,
})?;
}
let output = child
.wait_with_output()
.await
.map_err(|e| SendError::ProcessSpawn {
command: self.sendmail_path.clone(),
source: e,
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(SendError::ProcessFailed {
command: self.sendmail_path.clone(),
detail: stderr.to_string(),
});
}
Ok(())
}
}
const DISCORD_API_BASE: &str = "https://discord.com/api/v10";
#[derive(Clone)]
@@ -610,3 +532,28 @@ impl CommsSender for SignalSender {
))
}
}
#[cfg(test)]
mod is_permanent_matrix {
use super::{CommsChannel, SendError};
#[test]
fn permanent_variants_are_permanent() {
assert!(SendError::SmtpPermanent("x".into()).is_permanent());
assert!(SendError::DnsPermanent("x".into()).is_permanent());
assert!(SendError::InvalidRecipient("x".into()).is_permanent());
assert!(SendError::MessageBuild("x".into()).is_permanent());
assert!(SendError::DkimSign("x".into()).is_permanent());
assert!(SendError::ConfigInvalid("x".into()).is_permanent());
}
#[test]
fn transient_variants_are_not_permanent() {
assert!(!SendError::SmtpTransient("x".into()).is_permanent());
assert!(!SendError::DnsTransient("x".into()).is_permanent());
assert!(!SendError::Timeout.is_permanent());
assert!(!SendError::ExternalService("x".into()).is_permanent());
assert!(!SendError::MaxRetriesExceeded("x".into()).is_permanent());
assert!(!SendError::NotConfigured(CommsChannel::Email).is_permanent());
}
}

View File

@@ -244,6 +244,8 @@ pub trait InfraRepository: Send + Sync {
async fn mark_comms_failed(&self, id: Uuid, error: &str) -> Result<(), DbError>;
async fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), DbError>;
async fn create_invite_code(
&self,
code: &str,

View File

@@ -65,9 +65,13 @@ impl InfraRepository for PostgresInfraRepository {
SET status = 'processing', updated_at = NOW()
WHERE id IN (
SELECT id FROM comms_queue
WHERE status = 'pending'
WHERE attempts < max_attempts
AND scheduled_for <= $1
AND attempts < max_attempts
AND (
status = 'pending'
OR (status = 'processing'
AND updated_at < $1 - INTERVAL '10 minutes')
)
ORDER BY scheduled_for ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
@@ -127,6 +131,24 @@ impl InfraRepository for PostgresInfraRepository {
Ok(())
}
async fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), DbError> {
sqlx::query!(
r#"UPDATE comms_queue
SET status = 'failed'::comms_status,
attempts = max_attempts,
last_error = $2,
updated_at = NOW()
WHERE id = $1"#,
id,
error
)
.execute(&self.pool)
.await
.map_err(map_sqlx_error)?;
Ok(())
}
async fn create_invite_code(
&self,
code: &str,

View File

@@ -4,7 +4,7 @@ pub use tranquil_comms::{
CommsChannel, CommsSender, CommsStatus, CommsType, DEFAULT_LOCALE, DiscordSender, EmailSender,
NewComms, NotificationStrings, QueuedComms, SendError, SignalSender, TelegramSender,
VALID_LOCALES, format_message, get_strings, is_valid_phone_number, is_valid_signal_username,
mime_encode_header, sanitize_header_value, validate_locale,
validate_locale,
};
pub use service::{CommsService, repo as comms_repo, resolve_delivery_channel};

View File

@@ -149,13 +149,19 @@ impl CommsService {
}
}
Err(e) => {
let permanent = e.is_permanent();
let error_msg = e.to_string();
warn!(
comms_id = %comms_id,
error = %error_msg,
permanent,
"Failed to send comms"
);
if let Err(db_err) = self.mark_failed(comms_id, &error_msg).await {
let db_result = match permanent {
true => self.mark_failed_permanent(comms_id, &error_msg).await,
false => self.mark_failed(comms_id, &error_msg).await,
};
if let Err(db_err) = db_result {
error!(
comms_id = %comms_id,
error = %db_err,
@@ -173,6 +179,14 @@ impl CommsService {
async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), tranquil_db_traits::DbError> {
self.infra_repo.mark_comms_failed(id, error).await
}
async fn mark_failed_permanent(
&self,
id: Uuid,
error: &str,
) -> Result<(), tranquil_db_traits::DbError> {
self.infra_repo.mark_comms_failed_permanent(id, error).await
}
}
struct ResolvedRecipient {

View File

@@ -1,43 +1,7 @@
mod common;
use tranquil_pds::comms::{
SendError, is_valid_phone_number, is_valid_signal_username, sanitize_header_value,
};
use tranquil_pds::comms::{SendError, is_valid_phone_number, is_valid_signal_username};
use tranquil_pds::image::{ImageError, ImageProcessor};
#[test]
fn test_header_injection_sanitization() {
let malicious = "Injected\r\nBcc: attacker@evil.com";
let sanitized = sanitize_header_value(malicious);
assert!(!sanitized.contains('\r') && !sanitized.contains('\n'));
assert!(sanitized.contains("Injected") && sanitized.contains("Bcc:"));
let normal = "Normal Subject Line";
assert_eq!(sanitize_header_value(normal), "Normal Subject Line");
let padded = " Subject ";
assert_eq!(sanitize_header_value(padded), "Subject");
let multi_newline = "Line1\r\nLine2\nLine3\rLine4";
let sanitized = sanitize_header_value(multi_newline);
assert!(!sanitized.contains('\r') && !sanitized.contains('\n'));
assert!(sanitized.contains("Line1") && sanitized.contains("Line4"));
let header_injection = "Normal Subject\r\nBcc: attacker@evil.com\r\nX-Injected: value";
let sanitized = sanitize_header_value(header_injection);
assert_eq!(sanitized.split("\r\n").count(), 1);
assert!(
sanitized.contains("Normal Subject")
&& sanitized.contains("Bcc:")
&& sanitized.contains("X-Injected:")
);
let with_null = "client\0id";
assert!(sanitize_header_value(with_null).contains("client"));
let long_input = "x".repeat(10000);
assert!(!sanitize_header_value(&long_input).is_empty());
}
#[test]
fn test_phone_number_validation() {
assert!(is_valid_phone_number("+1234567890"));

View File

@@ -1860,6 +1860,18 @@ impl<S: StorageIO + 'static> tranquil_db_traits::InfraRepository for MetastoreCl
recv(rx).await
}
async fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), DbError> {
let (tx, rx) = oneshot::channel();
self.pool.send(MetastoreRequest::Infra(
InfraRequest::MarkCommsFailedPermanent {
id,
error: error.to_owned(),
tx,
},
))?;
recv(rx).await
}
async fn create_invite_code(
&self,
code: &str,

View File

@@ -1789,6 +1789,11 @@ pub enum InfraRequest {
error: String,
tx: Tx<()>,
},
MarkCommsFailedPermanent {
id: Uuid,
error: String,
tx: Tx<()>,
},
CreateInviteCode {
code: String,
use_count: i32,
@@ -3888,6 +3893,14 @@ fn dispatch_infra<S: StorageIO>(state: &HandlerState<S>, req: InfraRequest) {
.map_err(metastore_to_db);
let _ = tx.send(result);
}
InfraRequest::MarkCommsFailedPermanent { id, error, tx } => {
let result = state
.metastore
.infra_ops()
.mark_comms_failed_permanent(id, &error)
.map_err(metastore_to_db);
let _ = tx.send(result);
}
InfraRequest::CreateInviteCode {
code,
use_count,

View File

@@ -247,7 +247,6 @@ impl InfraOps {
val.status = status_to_u8(CommsStatus::Sent);
val.sent_at_ms = Some(Utc::now().timestamp_millis());
val.attempts = val.attempts.saturating_add(1);
let mut batch = self.db.batch();
batch.insert(&self.infra, key.as_slice(), val.serialize());
@@ -272,9 +271,46 @@ impl InfraOps {
)?
.ok_or(MetastoreError::InvalidInput("comms entry not found"))?;
let next_attempts = val.attempts.saturating_add(1);
let exhausted = next_attempts >= val.max_attempts;
let next_status = match exhausted {
true => CommsStatus::Failed,
false => CommsStatus::Pending,
};
let now_ms = Utc::now().timestamp_millis();
let backoff_ms = i64::from(next_attempts).saturating_mul(60_000);
val.status = status_to_u8(next_status);
val.error_message = Some(error.to_owned());
val.attempts = next_attempts;
val.scheduled_for_ms = now_ms.saturating_add(backoff_ms);
let mut batch = self.db.batch();
batch.insert(&self.infra, key.as_slice(), val.serialize());
if let Some((hk, mut hv)) =
self.find_history_entry(val.user_id.unwrap_or(Uuid::nil()), val.id)?
{
hv.status = status_to_u8(next_status);
batch.insert(&self.infra, hk.as_slice(), hv.serialize());
}
batch.commit().map_err(MetastoreError::Fjall)
}
pub fn mark_comms_failed_permanent(&self, id: Uuid, error: &str) -> Result<(), MetastoreError> {
let key = comms_queue_key(id);
let mut val: QueuedCommsValue = point_lookup(
&self.infra,
key.as_slice(),
QueuedCommsValue::deserialize,
"corrupt comms queue entry",
)?
.ok_or(MetastoreError::InvalidInput("comms entry not found"))?;
val.status = status_to_u8(CommsStatus::Failed);
val.error_message = Some(error.to_owned());
val.attempts = val.attempts.saturating_add(1);
val.attempts = val.max_attempts;
let mut batch = self.db.batch();
batch.insert(&self.infra, key.as_slice(), val.serialize());