You've already forked AstralRinth
forked from didirus/AstralRinth
* Fix download counts (#746) * Fix download counts * remove unsafe send * update indexing time * run prep * run prep again
This commit is contained in:
@@ -1,16 +1,16 @@
|
||||
use crate::database::models::DatabaseError;
|
||||
use crate::models::analytics::{Download, PageView, Playtime};
|
||||
use dashmap::DashSet;
|
||||
use crate::routes::ApiError;
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use redis::cmd;
|
||||
use sqlx::PgPool;
|
||||
use crate::database::redis::RedisPool;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
const VIEWS_TABLENAME: &str = "views";
|
||||
const DOWNLOADS_TABLENAME: &str = "downloads";
|
||||
const PLAYTIME_TABLENAME: &str = "playtime";
|
||||
const DOWNLOADS_NAMESPACE: &str = "downloads";
|
||||
|
||||
pub struct AnalyticsQueue {
|
||||
views_queue: DashSet<PageView>,
|
||||
downloads_queue: DashSet<Download>,
|
||||
downloads_queue: DashMap<String, Download>,
|
||||
playtime_queue: DashSet<Playtime>,
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ impl AnalyticsQueue {
|
||||
pub fn new() -> Self {
|
||||
AnalyticsQueue {
|
||||
views_queue: DashSet::with_capacity(1000),
|
||||
downloads_queue: DashSet::with_capacity(1000),
|
||||
downloads_queue: DashMap::with_capacity(1000),
|
||||
playtime_queue: DashSet::with_capacity(1000),
|
||||
}
|
||||
}
|
||||
@@ -35,45 +35,138 @@ impl AnalyticsQueue {
|
||||
}
|
||||
|
||||
pub fn add_download(&self, download: Download) {
|
||||
self.downloads_queue.insert(download);
|
||||
let octets = download.ip.octets();
|
||||
let ip_stripped = u64::from_be_bytes([
|
||||
octets[0], octets[1], octets[2], octets[3], octets[4], octets[5], octets[6], octets[7],
|
||||
]);
|
||||
self.downloads_queue
|
||||
.insert(format!("{}-{}", ip_stripped, download.project_id), download);
|
||||
}
|
||||
|
||||
pub fn add_playtime(&self, playtime: Playtime) {
|
||||
self.playtime_queue.insert(playtime);
|
||||
}
|
||||
|
||||
pub async fn index(&self, client: clickhouse::Client) -> Result<(), clickhouse::error::Error> {
|
||||
Self::index_queue(&client, &self.views_queue, VIEWS_TABLENAME).await?;
|
||||
Self::index_queue(&client, &self.downloads_queue, DOWNLOADS_TABLENAME).await?;
|
||||
Self::index_queue(&client, &self.playtime_queue, PLAYTIME_TABLENAME).await?;
|
||||
pub async fn index(
|
||||
&self,
|
||||
client: clickhouse::Client,
|
||||
redis: &RedisPool,
|
||||
pool: &PgPool,
|
||||
) -> Result<(), ApiError> {
|
||||
let views_queue = self.views_queue.clone();
|
||||
self.views_queue.clear();
|
||||
|
||||
let downloads_queue = self.downloads_queue.clone();
|
||||
self.downloads_queue.clear();
|
||||
|
||||
let playtime_queue = self.playtime_queue.clone();
|
||||
self.playtime_queue.clear();
|
||||
|
||||
if !views_queue.is_empty() {
|
||||
let mut views = client.insert("views")?;
|
||||
|
||||
for view in views_queue {
|
||||
views.write(&view).await?;
|
||||
}
|
||||
|
||||
views.end().await?;
|
||||
}
|
||||
|
||||
if !playtime_queue.is_empty() {
|
||||
let mut playtimes = client.insert("playtime")?;
|
||||
|
||||
for playtime in playtime_queue {
|
||||
playtimes.write(&playtime).await?;
|
||||
}
|
||||
|
||||
playtimes.end().await?;
|
||||
}
|
||||
|
||||
if !downloads_queue.is_empty() {
|
||||
let mut downloads_keys = Vec::new();
|
||||
let raw_downloads = DashMap::new();
|
||||
|
||||
for (index, (key, download)) in downloads_queue.into_iter().enumerate() {
|
||||
downloads_keys.push(key);
|
||||
raw_downloads.insert(index, download);
|
||||
}
|
||||
|
||||
let mut redis = redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
|
||||
|
||||
let results = cmd("MGET")
|
||||
.arg(
|
||||
downloads_keys
|
||||
.iter()
|
||||
.map(|x| format!("{}:{}", DOWNLOADS_NAMESPACE, x))
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.query_async::<_, Vec<Option<u32>>>(&mut redis)
|
||||
.await
|
||||
.map_err(DatabaseError::CacheError)?;
|
||||
|
||||
let mut pipe = redis::pipe();
|
||||
for (idx, count) in results.into_iter().enumerate() {
|
||||
let key = &downloads_keys[idx];
|
||||
|
||||
let new_count = if let Some(count) = count {
|
||||
if count > 5 {
|
||||
raw_downloads.remove(&idx);
|
||||
continue;
|
||||
}
|
||||
|
||||
count + 1
|
||||
} else {
|
||||
1
|
||||
};
|
||||
|
||||
pipe.atomic().set_ex(
|
||||
format!("{}:{}", DOWNLOADS_NAMESPACE, key),
|
||||
new_count,
|
||||
6 * 60 * 60,
|
||||
);
|
||||
}
|
||||
pipe.query_async(&mut *redis)
|
||||
.await
|
||||
.map_err(DatabaseError::CacheError)?;
|
||||
|
||||
let version_ids = raw_downloads
|
||||
.iter()
|
||||
.map(|x| x.version_id as i64)
|
||||
.collect::<Vec<_>>();
|
||||
let project_ids = raw_downloads
|
||||
.iter()
|
||||
.map(|x| x.project_id as i64)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut transaction = pool.begin().await?;
|
||||
let mut downloads = client.insert("downloads")?;
|
||||
|
||||
for (_, download) in raw_downloads {
|
||||
downloads.write(&download).await?;
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
"UPDATE versions
|
||||
SET downloads = downloads + 1
|
||||
WHERE id = ANY($1)",
|
||||
&version_ids
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
"UPDATE mods
|
||||
SET downloads = downloads + 1
|
||||
WHERE id = ANY($1)",
|
||||
&project_ids
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
downloads.end().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn index_queue<T>(
|
||||
client: &clickhouse::Client,
|
||||
queue: &DashSet<T>,
|
||||
table_name: &str,
|
||||
) -> Result<(), clickhouse::error::Error>
|
||||
where
|
||||
T: serde::Serialize + Eq + std::hash::Hash + Clone + clickhouse::Row,
|
||||
{
|
||||
if queue.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let current_queue = queue.clone();
|
||||
queue.clear();
|
||||
|
||||
let mut inserter = client.inserter(table_name)?;
|
||||
|
||||
for row in current_queue {
|
||||
inserter.write(&row).await?;
|
||||
inserter.commit().await?;
|
||||
}
|
||||
|
||||
inserter.end().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,128 +0,0 @@
|
||||
use futures::Future;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
use crate::clickhouse::init_client_with_database;
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_indexing() {
|
||||
with_test_clickhouse_db(|clickhouse_client| async move {
|
||||
let analytics = AnalyticsQueue::new();
|
||||
|
||||
analytics.add_download(get_default_download());
|
||||
analytics.add_playtime(get_default_playtime());
|
||||
analytics.add_view(get_default_views());
|
||||
|
||||
analytics.index(clickhouse_client.clone()).await.unwrap();
|
||||
assert_table_counts(&clickhouse_client, 1, 1, 1).await;
|
||||
|
||||
analytics.index(clickhouse_client.clone()).await.unwrap();
|
||||
assert_table_counts(&clickhouse_client, 1, 1, 1).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_insert_many_downloads() {
|
||||
with_test_clickhouse_db(|clickhouse_client| async move {
|
||||
let analytics = AnalyticsQueue::new();
|
||||
let n_downloads = 100_000;
|
||||
|
||||
for _ in 0..n_downloads {
|
||||
analytics.add_download(get_default_download());
|
||||
}
|
||||
|
||||
analytics.index(clickhouse_client.clone()).await.unwrap();
|
||||
assert_table_count(DOWNLOADS_TABLENAME, &clickhouse_client, n_downloads).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn assert_table_counts(
|
||||
client: &clickhouse::Client,
|
||||
downloads: u64,
|
||||
playtimes: u64,
|
||||
views: u64,
|
||||
) {
|
||||
assert_table_count(DOWNLOADS_TABLENAME, client, downloads).await;
|
||||
assert_table_count(PLAYTIME_TABLENAME, client, playtimes).await;
|
||||
assert_table_count(VIEWS_TABLENAME, client, views).await;
|
||||
}
|
||||
|
||||
async fn assert_table_count(table_name: &str, client: &clickhouse::Client, expected_count: u64) {
|
||||
let count = client
|
||||
.query(&format!("SELECT COUNT(*) from {table_name}"))
|
||||
.fetch_one::<u64>()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(expected_count, count);
|
||||
}
|
||||
|
||||
async fn with_test_clickhouse_db<Fut>(f: impl FnOnce(clickhouse::Client) -> Fut)
|
||||
where
|
||||
Fut: Future<Output = ()>,
|
||||
{
|
||||
let db_name = format!("test_{}", uuid::Uuid::new_v4().as_simple());
|
||||
println!("Clickhouse test db: {}", db_name);
|
||||
let clickhouse_client = init_client_with_database(&db_name)
|
||||
.await
|
||||
.expect("A real clickhouse instance should be running locally");
|
||||
|
||||
f(clickhouse_client.clone()).await;
|
||||
|
||||
clickhouse_client
|
||||
.query(&format!("DROP DATABASE IF EXISTS {db_name}"))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn get_default_download() -> Download {
|
||||
Download {
|
||||
id: Uuid::new_v4(),
|
||||
recorded: Default::default(),
|
||||
domain: Default::default(),
|
||||
site_path: Default::default(),
|
||||
user_id: Default::default(),
|
||||
project_id: Default::default(),
|
||||
version_id: Default::default(),
|
||||
ip: get_default_ipv6(),
|
||||
country: Default::default(),
|
||||
user_agent: Default::default(),
|
||||
headers: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_default_playtime() -> Playtime {
|
||||
Playtime {
|
||||
id: Uuid::new_v4(),
|
||||
recorded: Default::default(),
|
||||
seconds: Default::default(),
|
||||
user_id: Default::default(),
|
||||
project_id: Default::default(),
|
||||
version_id: Default::default(),
|
||||
loader: Default::default(),
|
||||
game_version: Default::default(),
|
||||
parent: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_default_views() -> PageView {
|
||||
PageView {
|
||||
id: Uuid::new_v4(),
|
||||
recorded: Default::default(),
|
||||
domain: Default::default(),
|
||||
site_path: Default::default(),
|
||||
user_id: Default::default(),
|
||||
project_id: Default::default(),
|
||||
ip: get_default_ipv6(),
|
||||
country: Default::default(),
|
||||
user_agent: Default::default(),
|
||||
headers: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_default_ipv6() -> Ipv6Addr {
|
||||
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
use crate::database::models::{DatabaseError, ProjectId, VersionId};
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub struct DownloadQueue {
|
||||
queue: Mutex<Vec<(ProjectId, VersionId)>>,
|
||||
}
|
||||
|
||||
impl Default for DownloadQueue {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// Batches download transactions every thirty seconds
|
||||
impl DownloadQueue {
|
||||
pub fn new() -> Self {
|
||||
DownloadQueue {
|
||||
queue: Mutex::new(Vec::with_capacity(1000)),
|
||||
}
|
||||
}
|
||||
pub async fn add(&self, project_id: ProjectId, version_id: VersionId) {
|
||||
self.queue.lock().await.push((project_id, version_id));
|
||||
}
|
||||
|
||||
pub async fn take(&self) -> Vec<(ProjectId, VersionId)> {
|
||||
let mut queue = self.queue.lock().await;
|
||||
let len = queue.len();
|
||||
|
||||
std::mem::replace(&mut queue, Vec::with_capacity(len))
|
||||
}
|
||||
|
||||
pub async fn index(&self, pool: &PgPool) -> Result<(), DatabaseError> {
|
||||
let queue = self.take().await;
|
||||
|
||||
if !queue.is_empty() {
|
||||
let mut transaction = pool.begin().await?;
|
||||
|
||||
for (project_id, version_id) in queue {
|
||||
sqlx::query!(
|
||||
"UPDATE versions
|
||||
SET downloads = downloads + 1
|
||||
WHERE (id = $1)",
|
||||
version_id as VersionId
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
"UPDATE mods
|
||||
SET downloads = downloads + 1
|
||||
WHERE (id = $1)",
|
||||
project_id as ProjectId
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
}
|
||||
|
||||
transaction.commit().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,4 @@
|
||||
pub mod analytics;
|
||||
pub mod download;
|
||||
pub mod maxmind;
|
||||
pub mod payouts;
|
||||
pub mod session;
|
||||
|
||||
Reference in New Issue
Block a user