Creator tax compliance (#4254)

* Initial implementation

* Remove test code

* Query cache

* Appease clippy

* Precise TIN/SSN

* Make tax threshold customizable via env variable

* Address review comments
This commit is contained in:
François-Xavier Talbot
2025-08-25 12:34:58 -04:00
committed by GitHub
parent ca36d11570
commit 006b19e3c9
16 changed files with 773 additions and 37 deletions

View File

@@ -139,6 +139,8 @@ pub enum ApiError {
NotFound,
#[error("Conflict: {0}")]
Conflict(String),
#[error("External tax compliance API Error")]
TaxComplianceApi,
#[error(
"You are being rate-limited. Please wait {0} milliseconds. 0/{1} remaining."
)]
@@ -175,6 +177,7 @@ impl ApiError {
ApiError::Reroute(..) => "reroute_error",
ApiError::NotFound => "not_found",
ApiError::Conflict(..) => "conflict",
ApiError::TaxComplianceApi => "tax_compliance_api_error",
ApiError::Zip(..) => "zip_error",
ApiError::Io(..) => "io_error",
ApiError::RateLimitError(..) => "ratelimit_error",
@@ -212,6 +215,7 @@ impl actix_web::ResponseError for ApiError {
ApiError::Reroute(..) => StatusCode::INTERNAL_SERVER_ERROR,
ApiError::NotFound => StatusCode::NOT_FOUND,
ApiError::Conflict(..) => StatusCode::CONFLICT,
ApiError::TaxComplianceApi => StatusCode::INTERNAL_SERVER_ERROR,
ApiError::Zip(..) => StatusCode::BAD_REQUEST,
ApiError::Io(..) => StatusCode::BAD_REQUEST,
ApiError::RateLimitError(..) => StatusCode::TOO_MANY_REQUESTS,

View File

@@ -1,6 +1,7 @@
use crate::auth::validate::get_user_record_from_bearer_token;
use crate::auth::{AuthenticationError, get_user_from_headers};
use crate::database::models::generate_payout_id;
use crate::database::models::DBUserId;
use crate::database::models::{generate_payout_id, users_compliance};
use crate::database::redis::RedisPool;
use crate::models::ids::PayoutId;
use crate::models::pats::Scopes;
@@ -8,6 +9,7 @@ use crate::models::payouts::{PayoutMethodType, PayoutStatus};
use crate::queue::payouts::PayoutsQueue;
use crate::queue::session::AuthQueue;
use crate::routes::ApiError;
use crate::util::avalara1099;
use actix_web::{HttpRequest, HttpResponse, delete, get, post, web};
use chrono::{DateTime, Duration, Utc};
use hex::ToHex;
@@ -19,6 +21,10 @@ use serde_json::json;
use sha2::Sha256;
use sqlx::PgPool;
use std::collections::HashMap;
use tracing::error;
const COMPLIANCE_CHECK_DEBOUNCE: chrono::Duration =
chrono::Duration::seconds(15);
pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(
@@ -30,10 +36,102 @@ pub fn config(cfg: &mut web::ServiceConfig) {
.service(cancel_payout)
.service(payment_methods)
.service(get_balance)
.service(platform_revenue),
.service(platform_revenue)
.service(post_compliance_form),
);
}
#[derive(Deserialize)]
pub struct RequestForm {
form_type: users_compliance::FormType,
}
#[post("compliance")]
pub async fn post_compliance_form(
req: HttpRequest,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
body: web::Json<RequestForm>,
session_queue: web::Data<AuthQueue>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Scopes::PAYOUTS_WRITE,
)
.await?
.1;
let user_id = DBUserId(user.id.0 as i64);
let mut txn = pool.begin().await?;
let maybe_compliance =
users_compliance::UserCompliance::get_by_user_id(&mut *txn, user_id)
.await?;
let mut compliance = match maybe_compliance {
Some(c) => c,
None => users_compliance::UserCompliance {
id: 0,
user_id,
requested: Utc::now(),
signed: None,
last_checked: Utc::now() - COMPLIANCE_CHECK_DEBOUNCE,
external_request_id: String::new(),
reference_id: String::new(),
e_delivery_consented: false,
tin_matched: false,
form_type: body.0.form_type,
},
};
let result = avalara1099::request_form(user_id, body.0.form_type).await?;
match result {
Ok(
ref toplevel @ avalara1099::DataWrapper {
data:
avalara1099::Data {
r#type: _,
id: Some(ref request_id),
ref attributes,
links: _,
},
},
) => {
compliance.external_request_id = request_id.clone();
compliance.reference_id = attributes.reference_id.clone();
compliance.requested = Utc::now();
compliance.e_delivery_consented = false;
compliance.tin_matched = false;
compliance.signed = None;
compliance.form_type = body.0.form_type;
compliance.last_checked = Utc::now() - COMPLIANCE_CHECK_DEBOUNCE;
compliance.upsert(&mut *txn).await?;
txn.commit().await?;
Ok(HttpResponse::Ok().json(toplevel))
}
Ok(_) => {
error!("Missing form request ID in Avalara response");
Err(ApiError::TaxComplianceApi)
}
Err(json_error) => {
error!(
"Error sending request to Avalara: {}",
serde_json::to_string_pretty(&json_error).unwrap()
);
Err(ApiError::TaxComplianceApi)
}
}
}
#[post("_paypal")]
pub async fn paypal_webhook(
req: HttpRequest,
@@ -391,6 +489,45 @@ pub async fn create_payout(
));
}
if let Some(threshold) = tax_compliance_payout_threshold() {
let maybe_compliance = update_compliance_status(&pool, user.id).await?;
let (tin_matched, signed, requested, api_check_failed) =
match maybe_compliance {
Some(ComplianceCheck {
model,
compliance_api_check_failed,
}) => {
let tin = model.tin_matched;
let signed = model.signed.is_some();
(tin, signed, true, compliance_api_check_failed)
}
None => (false, false, false, false),
};
if !(tin_matched && signed)
&& balance.withdrawn_ytd + body.amount >= threshold
{
// We propagate the error this way because we don't want to block payouts
// that would be acceptable regardless of the tax form submission status
// if the compliance API is down.
// In this case the payout is going to be blocked, so do return that we hit an
// error with the API, as this is more accurate than saying the form wasn't completed
// properly as this might be wrong!
if api_check_failed {
return Err(ApiError::TaxComplianceApi);
}
return Err(ApiError::InvalidInput(match (tin_matched, signed, requested) {
(_, false, true) => "Tax form isn't signed yet!",
(false, true, true) => "Tax form is signed, but the Tax Identification Number/SSN didn't match the IRS records. Withdrawals are blocked until the TIN/SSN matches.",
_ => "Tax compliance form is required to withdraw more!",
}.to_owned()));
}
}
let payout_method = payouts_queue
.get_payout_methods()
.await?
@@ -765,6 +902,8 @@ pub async fn payment_methods(
#[derive(Serialize)]
pub struct UserBalance {
pub available: Decimal,
pub withdrawn_lifetime: Decimal,
pub withdrawn_ytd: Decimal,
pub pending: Decimal,
pub dates: HashMap<DateTime<Utc>, Decimal>,
}
@@ -819,7 +958,10 @@ async fn get_user_balance(
let withdrawn = sqlx::query!(
"
SELECT SUM(amount) amount, SUM(fee) fee
SELECT
SUM(amount) amount,
SUM(fee) fee,
SUM(amount) FILTER (WHERE created >= DATE_TRUNC('year', NOW())) amount_this_year
FROM payouts
WHERE user_id = $1 AND (status = 'success' OR status = 'in-transit')
",
@@ -828,18 +970,19 @@ async fn get_user_balance(
.fetch_optional(pool)
.await?;
let (withdrawn, fees) =
withdrawn.map_or((Decimal::ZERO, Decimal::ZERO), |x| {
let (withdrawn, fees, withdrawn_this_year) =
withdrawn.map_or((Decimal::ZERO, Decimal::ZERO, Decimal::ZERO), |x| {
(
x.amount.unwrap_or(Decimal::ZERO),
x.fee.unwrap_or(Decimal::ZERO),
x.amount_this_year.unwrap_or(Decimal::ZERO),
)
});
Ok(UserBalance {
available: available.round_dp(16)
- withdrawn.round_dp(16)
- fees.round_dp(16),
available: (available - withdrawn - fees).round_dp(16),
withdrawn_lifetime: withdrawn.round_dp(16),
withdrawn_ytd: withdrawn_this_year.round_dp(16),
pending,
dates: payouts
.iter()
@@ -848,6 +991,88 @@ async fn get_user_balance(
})
}
struct ComplianceCheck {
model: users_compliance::UserCompliance,
compliance_api_check_failed: bool,
}
async fn update_compliance_status(
pg: &PgPool,
user_id: crate::database::models::ids::DBUserId,
) -> Result<Option<ComplianceCheck>, ApiError> {
let maybe_compliance =
users_compliance::UserCompliance::get_by_user_id(pg, user_id).await?;
let Some(mut compliance) = maybe_compliance else {
return Ok(None);
};
if compliance.signed.is_some()
|| Utc::now().signed_duration_since(compliance.last_checked)
< COMPLIANCE_CHECK_DEBOUNCE
{
Ok(Some(ComplianceCheck {
model: compliance,
compliance_api_check_failed: false,
}))
} else {
let result = avalara1099::check_form(&compliance.reference_id).await?;
let mut compliance_api_check_failed = false;
compliance.last_checked = Utc::now();
match result {
Ok(None) => {
// Means the form wasn't signed yet
compliance.signed = None;
compliance.e_delivery_consented = false;
compliance.tin_matched = false;
}
Ok(Some(avalara1099::DataWrapper {
data: avalara1099::Data { attributes, .. },
})) => {
// It's unclear what timezone the DateTime is in (as it returns a naive RFC-3339 timestamp)
// so we can just say it was signed now
compliance.signed =
(&attributes.entry_status == "signed").then(Utc::now);
compliance.e_delivery_consented =
attributes.e_delivery_consented_at.is_some();
if compliance.form_type.requires_domestic_tin_match() {
compliance.tin_matched = attributes
.tin_match_status
.as_ref()
.is_some_and(|x| x == "matched");
} else {
compliance.tin_matched = true;
}
}
Err(json_error) => {
error!(
"Error sending request to Avalara: {}",
serde_json::to_string_pretty(&json_error).unwrap()
);
compliance_api_check_failed = true;
}
}
compliance.update(pg).await?;
Ok(Some(ComplianceCheck {
model: compliance,
compliance_api_check_failed,
}))
}
}
fn tax_compliance_payout_threshold() -> Option<Decimal> {
dotenvy::var("COMPLIANCE_PAYOUT_THRESHOLD")
.ok()
.and_then(|s| s.parse().ok())
}
#[derive(Deserialize)]
pub struct RevenueQuery {
pub start: Option<DateTime<Utc>>,