Batch inserts [MOD-555] (#726)

* Batch a bunch of inserts, but still more to do

* Insert many for clickhouse (+ tests)

* Batch the remaining ones except those requiring deduplication

* Risky dedups

* Bit o cleanup and formatting

* cargo sqlx prepare

* Add test around batch editing project categories

* Add struct to satisfy clippy

* Fix silly mistake that was caught by the tests!

* Leave room for growth in dummy_data
This commit is contained in:
Jackson Kruger
2023-10-11 13:32:58 -05:00
committed by GitHub
parent dfa43f3c5a
commit d92272ffa0
23 changed files with 1208 additions and 929 deletions

View File

@@ -1,6 +1,13 @@
use crate::models::analytics::{Download, PageView, Playtime};
use dashmap::DashSet;
#[cfg(test)]
mod tests;
const VIEWS_TABLENAME: &str = "views";
const DOWNLOADS_TABLENAME: &str = "downloads";
const PLAYTIME_TABLENAME: &str = "playtime";
pub struct AnalyticsQueue {
views_queue: DashSet<PageView>,
downloads_queue: DashSet<Download>,
@@ -17,54 +24,50 @@ impl AnalyticsQueue {
}
}
pub async fn add_view(&self, page_view: PageView) {
pub fn add_view(&self, page_view: PageView) {
self.views_queue.insert(page_view);
}
pub async fn add_download(&self, download: Download) {
pub fn add_download(&self, download: Download) {
self.downloads_queue.insert(download);
}
pub async fn add_playtime(&self, playtime: Playtime) {
pub fn add_playtime(&self, playtime: Playtime) {
self.playtime_queue.insert(playtime);
}
pub async fn index(&self, client: clickhouse::Client) -> Result<(), clickhouse::error::Error> {
let views_queue = self.views_queue.clone();
self.views_queue.clear();
Self::index_queue(&client, &self.views_queue, VIEWS_TABLENAME).await?;
Self::index_queue(&client, &self.downloads_queue, DOWNLOADS_TABLENAME).await?;
Self::index_queue(&client, &self.playtime_queue, PLAYTIME_TABLENAME).await?;
let downloads_queue = self.downloads_queue.clone();
self.downloads_queue.clear();
Ok(())
}
let playtime_queue = self.playtime_queue.clone();
self.playtime_queue.clear();
if !views_queue.is_empty() || !downloads_queue.is_empty() || !playtime_queue.is_empty() {
let mut views = client.insert("views")?;
for view in views_queue {
views.write(&view).await?;
}
views.end().await?;
let mut downloads = client.insert("downloads")?;
for download in downloads_queue {
downloads.write(&download).await?;
}
downloads.end().await?;
let mut playtimes = client.insert("playtime")?;
for playtime in playtime_queue {
playtimes.write(&playtime).await?;
}
playtimes.end().await?;
async fn index_queue<T>(
client: &clickhouse::Client,
queue: &DashSet<T>,
table_name: &str,
) -> Result<(), clickhouse::error::Error>
where
T: serde::Serialize + Eq + std::hash::Hash + Clone + clickhouse::Row,
{
if queue.is_empty() {
return Ok(());
}
let current_queue = queue.clone();
queue.clear();
let mut inserter = client.inserter(table_name)?;
for row in current_queue {
inserter.write(&row).await?;
inserter.commit().await?;
}
inserter.end().await?;
Ok(())
}
}

View File

@@ -0,0 +1,128 @@
use futures::Future;
use uuid::Uuid;
use super::*;
use crate::clickhouse::init_client_with_database;
use std::net::Ipv6Addr;
#[tokio::test]
async fn test_indexing() {
with_test_clickhouse_db(|clickhouse_client| async move {
let analytics = AnalyticsQueue::new();
analytics.add_download(get_default_download());
analytics.add_playtime(get_default_playtime());
analytics.add_view(get_default_views());
analytics.index(clickhouse_client.clone()).await.unwrap();
assert_table_counts(&clickhouse_client, 1, 1, 1).await;
analytics.index(clickhouse_client.clone()).await.unwrap();
assert_table_counts(&clickhouse_client, 1, 1, 1).await;
})
.await;
}
#[tokio::test]
async fn can_insert_many_downloads() {
with_test_clickhouse_db(|clickhouse_client| async move {
let analytics = AnalyticsQueue::new();
let n_downloads = 100_000;
for _ in 0..n_downloads {
analytics.add_download(get_default_download());
}
analytics.index(clickhouse_client.clone()).await.unwrap();
assert_table_count(DOWNLOADS_TABLENAME, &clickhouse_client, n_downloads).await;
})
.await;
}
async fn assert_table_counts(
client: &clickhouse::Client,
downloads: u64,
playtimes: u64,
views: u64,
) {
assert_table_count(DOWNLOADS_TABLENAME, client, downloads).await;
assert_table_count(PLAYTIME_TABLENAME, client, playtimes).await;
assert_table_count(VIEWS_TABLENAME, client, views).await;
}
async fn assert_table_count(table_name: &str, client: &clickhouse::Client, expected_count: u64) {
let count = client
.query(&format!("SELECT COUNT(*) from {table_name}"))
.fetch_one::<u64>()
.await
.unwrap();
assert_eq!(expected_count, count);
}
async fn with_test_clickhouse_db<Fut>(f: impl FnOnce(clickhouse::Client) -> Fut)
where
Fut: Future<Output = ()>,
{
let db_name = format!("test_{}", uuid::Uuid::new_v4().as_simple());
println!("Clickhouse test db: {}", db_name);
let clickhouse_client = init_client_with_database(&db_name)
.await
.expect("A real clickhouse instance should be running locally");
f(clickhouse_client.clone()).await;
clickhouse_client
.query(&format!("DROP DATABASE IF EXISTS {db_name}"))
.execute()
.await
.unwrap();
}
fn get_default_download() -> Download {
Download {
id: Uuid::new_v4(),
recorded: Default::default(),
domain: Default::default(),
site_path: Default::default(),
user_id: Default::default(),
project_id: Default::default(),
version_id: Default::default(),
ip: get_default_ipv6(),
country: Default::default(),
user_agent: Default::default(),
headers: Default::default(),
}
}
fn get_default_playtime() -> Playtime {
Playtime {
id: Uuid::new_v4(),
recorded: Default::default(),
seconds: Default::default(),
user_id: Default::default(),
project_id: Default::default(),
version_id: Default::default(),
loader: Default::default(),
game_version: Default::default(),
parent: Default::default(),
}
}
fn get_default_views() -> PageView {
PageView {
id: Uuid::new_v4(),
recorded: Default::default(),
domain: Default::default(),
site_path: Default::default(),
user_id: Default::default(),
project_id: Default::default(),
ip: get_default_ipv6(),
country: Default::default(),
user_agent: Default::default(),
headers: Default::default(),
}
}
fn get_default_ipv6() -> Ipv6Addr {
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)
}

View File

@@ -355,6 +355,8 @@ pub async fn process_payout(
};
let mut clear_cache_users = Vec::new();
let (mut insert_user_ids, mut insert_project_ids, mut insert_payouts, mut insert_starts) =
(Vec::new(), Vec::new(), Vec::new(), Vec::new());
for (id, project) in projects_map {
if let Some(value) = &multipliers.values.get(&(id as u64)) {
let project_multiplier: Decimal =
@@ -367,18 +369,10 @@ pub async fn process_payout(
let payout: Decimal = payout * project_multiplier * (split / sum_splits);
if payout > Decimal::ZERO {
sqlx::query!(
"
INSERT INTO payouts_values (user_id, mod_id, amount, created)
VALUES ($1, $2, $3, $4)
",
user_id,
id,
payout,
start
)
.execute(&mut *transaction)
.await?;
insert_user_ids.push(user_id);
insert_project_ids.push(id);
insert_payouts.push(payout);
insert_starts.push(start);
sqlx::query!(
"
@@ -399,6 +393,19 @@ pub async fn process_payout(
}
}
sqlx::query!(
"
INSERT INTO payouts_values (user_id, mod_id, amount, created)
SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[])
",
&insert_user_ids[..],
&insert_project_ids[..],
&insert_payouts[..],
&insert_starts[..]
)
.execute(&mut *transaction)
.await?;
if !clear_cache_users.is_empty() {
crate::database::models::User::clear_caches(
&clear_cache_users