Files
AstralRinth/apps/labrinth/src/queue/analytics.rs
Josiah Glosson b23d3e674f Update Rust & Java dependencies (#4540)
* Update Java dependencies

* Baselint lint fixes

* Update Rust version

* Update actix-files 0.6.6 -> 0.6.8

* Update actix-http 3.11.0 -> 3.11.2

* Update actix-rt 2.10.0 -> 2.11.0

* Update async_zip 0.0.17 -> 0.0.18

* Update async-compression 0.4.27 -> 0.4.32

* Update async-trait 0.1.88 -> 0.1.89

* Update async-tungstenite 0.30.0 -> 0.31.0

* Update const_format 0.2.34 -> 0.2.35

* Update bitflags 2.9.1 -> 2.9.4

* Update bytemuck 1.23.1 -> 1.24.0

* Update typed-path 0.11.0 -> 0.12.0

* Update chrono 0.4.41 -> 0.4.42

* Update cidre 0.11.2 -> 0.11.3

* Update clap 4.5.43 -> 4.5.48

* Update data-url 0.3.1 -> 0.3.2

* Update discord-rich-presence 0.2.5 -> 1.0.0

* Update enumset 1.1.7 -> 1.1.10

* Update flate2 1.1.2 -> 1.1.4

* Update hyper 1.6.0 -> 1.7.0

* Update hyper-util 0.1.16 -> 0.1.17

* Update iana-time-zone 0.1.63 -> 0.1.64

* Update image 0.25.6 -> 0.25.8

* Update indexmap 2.10.0 -> 2.11.4

* Update json-patch 4.0.0 -> 4.1.0

* Update meilisearch-sdk 0.29.1 -> 0.30.0

* Update clickhouse 0.13.3 -> 0.14.0

* Fix some prettier things

* Update lettre 0.11.18 -> 0.11.19

* Update phf 0.12.1 -> 0.13.1

* Update png 0.17.16 -> 0.18.0

* Update quick-xml 0.38.1 -> 0.38.3

* Update redis 0.32.4 -> 0.32.7

* Update regex 1.11.1 -> 1.11.3

* Update reqwest 0.12.22 -> 0.12.23

* Update rust_decimal 1.37.2 -> 1.38.0

* Update rust-s3 0.35.1 -> 0.37.0

* Update serde 1.0.219 -> 1.0.228

* Update serde_bytes 0.11.17 -> 0.11.19

* Update serde_json 1.0.142 -> 1.0.145

* Update serde_with 3.14.0 -> 3.15.0

* Update sentry 0.42.0 -> 0.45.0 and sentry-actix 0.42.0 -> 0.45.0

* Update spdx 0.10.9 -> 0.12.0

* Update sysinfo 0.36.1 -> 0.37.2

* Update tauri 2.7.0 -> 2.8.5

* Update tauri-build 2.3.1 -> 2.4.1

* Update tauri-plugin-deep-link 2.4.1 -> 2.4.3

* Update tauri-plugin-dialog 2.3.2 -> 2.4.0

* Update tauri-plugin-http 2.5.1 -> 2.5.2

* Update tauri-plugin-opener 2.4.0 -> 2.5.0

* Update tauri-plugin-os 2.3.0 -> 2.3.1

* Update tauri-plugin-single-instance 2.3.2 -> 2.3.4

* Update tempfile 3.20.0 -> 3.23.0

* Update thiserror 2.0.12 -> 2.0.17

* Update tracing-subscriber 0.3.19 -> 0.3.20

* Update url 2.5.4 -> 2.5.7

* Update uuid 1.17.0 -> 1.18.1

* Update webp 0.3.0 -> 0.3.1

* Update whoami 1.6.0 -> 1.6.1

* Note that windows and windows-core can't be updated yet

* Update zbus 5.9.0 -> 5.11.0

* Update zip 4.3.0 -> 6.0.0

* Fix build

* Enforce rustls crypto provider

* Refresh Cargo.lock

* Update transitive dependencies

* Bump Gradle usage to Java 17

* Use ubuntu-latest consistently across workflows

* Fix lint

* Fix lint in Rust

* Update native-dialog 0.9.0 -> 0.9.2

* Update regex 1.11.3 -> 1.12.2

* Update reqwest 0.12.23 -> 0.12.24

* Update rust_decimal 1.38.0 -> 1.39.0

* Remaining lock-only updates

* chore: move TLS impl of some other dependencies to aws-lc-rs

The AWS bloatware "virus" expands by sheer force of widespread adoption
by the ecosystem... 🫣

* chore(fmt): run Tombi

---------

Co-authored-by: Alejandro González <me@alegon.dev>
2025-10-15 20:45:47 +00:00

252 lines
8.0 KiB
Rust

use crate::database::models::DatabaseError;
use crate::database::redis::RedisPool;
use crate::models::analytics::{Download, PageView, Playtime};
use crate::routes::ApiError;
use dashmap::{DashMap, DashSet};
use redis::cmd;
use sqlx::PgPool;
use std::collections::HashMap;
const DOWNLOADS_NAMESPACE: &str = "downloads";
const VIEWS_NAMESPACE: &str = "views";
pub struct AnalyticsQueue {
views_queue: DashMap<(u64, u64), Vec<PageView>>,
downloads_queue: DashMap<(u64, u64), Download>,
playtime_queue: DashSet<Playtime>,
}
impl Default for AnalyticsQueue {
fn default() -> Self {
Self::new()
}
}
// Batches analytics data points + transactions every few minutes
impl AnalyticsQueue {
pub fn new() -> Self {
AnalyticsQueue {
views_queue: DashMap::with_capacity(1000),
downloads_queue: DashMap::with_capacity(1000),
playtime_queue: DashSet::with_capacity(1000),
}
}
pub fn add_view(&self, page_view: PageView) {
let ip_stripped = crate::util::ip::strip_ip(page_view.ip);
self.views_queue
.entry((ip_stripped, page_view.project_id))
.or_default()
.push(page_view);
}
pub fn add_download(&self, download: Download) {
let ip_stripped = crate::util::ip::strip_ip(download.ip);
self.downloads_queue
.insert((ip_stripped, download.project_id), download);
}
pub fn add_playtime(&self, playtime: Playtime) {
self.playtime_queue.insert(playtime);
}
pub async fn index(
&self,
client: clickhouse::Client,
redis: &RedisPool,
pool: &PgPool,
) -> Result<(), ApiError> {
let views_queue = self.views_queue.clone();
self.views_queue.clear();
let downloads_queue = self.downloads_queue.clone();
self.downloads_queue.clear();
let playtime_queue = self.playtime_queue.clone();
self.playtime_queue.clear();
if !playtime_queue.is_empty() {
let mut playtimes = client.insert::<Playtime>("playtime").await?;
for playtime in playtime_queue {
playtimes.write(&playtime).await?;
}
playtimes.end().await?;
}
if !views_queue.is_empty() {
let mut views_keys = Vec::new();
let mut raw_views = Vec::new();
for (key, views) in views_queue {
views_keys.push(key);
raw_views.push((views, true));
}
let mut redis =
redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
let results = cmd("MGET")
.arg(
views_keys
.iter()
.map(|x| format!("{}:{}-{}", VIEWS_NAMESPACE, x.0, x.1))
.collect::<Vec<_>>(),
)
.query_async::<Vec<Option<u32>>>(&mut redis)
.await
.map_err(DatabaseError::CacheError)?;
let mut pipe = redis::pipe();
for (idx, count) in results.into_iter().enumerate() {
let key = &views_keys[idx];
let new_count =
if let Some((views, monetized)) = raw_views.get_mut(idx) {
if let Some(count) = count {
if count > 3 {
*monetized = false;
continue;
}
if (count + views.len() as u32) > 3 {
*monetized = false;
}
count + (views.len() as u32)
} else {
views.len() as u32
}
} else {
1
};
pipe.atomic().set_ex(
format!("{}:{}-{}", VIEWS_NAMESPACE, key.0, key.1),
new_count,
6 * 60 * 60,
);
}
pipe.query_async::<()>(&mut *redis)
.await
.map_err(DatabaseError::CacheError)?;
let mut views = client.insert::<PageView>("views").await?;
for (all_views, monetized) in raw_views {
for (idx, mut view) in all_views.into_iter().enumerate() {
if idx != 0 || !monetized {
view.monetized = false;
}
views.write(&view).await?;
}
}
views.end().await?;
}
if !downloads_queue.is_empty() {
let mut downloads_keys = Vec::new();
let raw_downloads = DashMap::new();
for (index, (key, download)) in
downloads_queue.into_iter().enumerate()
{
downloads_keys.push(key);
raw_downloads.insert(index, download);
}
let mut redis =
redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
let results = cmd("MGET")
.arg(
downloads_keys
.iter()
.map(|x| {
format!("{}:{}-{}", DOWNLOADS_NAMESPACE, x.0, x.1)
})
.collect::<Vec<_>>(),
)
.query_async::<Vec<Option<u32>>>(&mut redis)
.await
.map_err(DatabaseError::CacheError)?;
let mut pipe = redis::pipe();
for (idx, count) in results.into_iter().enumerate() {
let key = &downloads_keys[idx];
let new_count = if let Some(count) = count {
if count > 5 {
raw_downloads.remove(&idx);
continue;
}
count + 1
} else {
1
};
pipe.atomic().set_ex(
format!("{}:{}-{}", DOWNLOADS_NAMESPACE, key.0, key.1),
new_count,
6 * 60 * 60,
);
}
pipe.query_async::<()>(&mut *redis)
.await
.map_err(DatabaseError::CacheError)?;
let mut transaction = pool.begin().await?;
let mut downloads = client.insert::<Download>("downloads").await?;
let mut version_downloads: HashMap<i64, i32> = HashMap::new();
let mut project_downloads: HashMap<i64, i32> = HashMap::new();
for (_, download) in raw_downloads {
*version_downloads
.entry(download.version_id as i64)
.or_default() += 1;
*project_downloads
.entry(download.project_id as i64)
.or_default() += 1;
downloads.write(&download).await?;
}
sqlx::query(
"
UPDATE versions v
SET downloads = v.downloads + x.amount
FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount)
WHERE v.id = x.id
",
)
.bind(version_downloads.keys().copied().collect::<Vec<_>>())
.bind(version_downloads.values().copied().collect::<Vec<_>>())
.execute(&mut *transaction)
.await?;
sqlx::query(
"
UPDATE mods m
SET downloads = m.downloads + x.amount
FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount)
WHERE m.id = x.id
",
)
.bind(project_downloads.keys().copied().collect::<Vec<_>>())
.bind(project_downloads.values().copied().collect::<Vec<_>>())
.execute(&mut *transaction)
.await?;
transaction.commit().await?;
downloads.end().await?;
}
Ok(())
}
}