Add launcher analytics (#661)

* Add more analytics

* finish hydra move

* Finish websocket flow

* add minecraft account flow

* Finish playtime vals + payout automation
This commit is contained in:
Geometrically
2023-08-02 14:43:04 -07:00
committed by GitHub
parent 4bb47d7e01
commit 039d26feeb
49 changed files with 2636 additions and 743 deletions

70
src/queue/analytics.rs Normal file
View File

@@ -0,0 +1,70 @@
use crate::models::analytics::{Download, PageView, Playtime};
use dashmap::DashSet;
pub struct AnalyticsQueue {
views_queue: DashSet<PageView>,
downloads_queue: DashSet<Download>,
playtime_queue: DashSet<Playtime>,
}
// Batches analytics data points + transactions every few minutes
impl AnalyticsQueue {
pub fn new() -> Self {
AnalyticsQueue {
views_queue: DashSet::with_capacity(1000),
downloads_queue: DashSet::with_capacity(1000),
playtime_queue: DashSet::with_capacity(1000),
}
}
pub async fn add_view(&self, page_view: PageView) {
self.views_queue.insert(page_view);
}
pub async fn add_download(&self, download: Download) {
self.downloads_queue.insert(download);
}
pub async fn add_playtime(&self, playtime: Playtime) {
self.playtime_queue.insert(playtime);
}
pub async fn index(&self, client: clickhouse::Client) -> Result<(), clickhouse::error::Error> {
let views_queue = self.views_queue.clone();
self.views_queue.clear();
let downloads_queue = self.downloads_queue.clone();
self.downloads_queue.clear();
let playtime_queue = self.playtime_queue.clone();
self.playtime_queue.clear();
if !views_queue.is_empty() || !downloads_queue.is_empty() || !playtime_queue.is_empty() {
let mut views = client.insert("views")?;
for view in views_queue {
views.write(&view).await?;
}
views.end().await?;
let mut downloads = client.insert("downloads")?;
for download in downloads_queue {
downloads.write(&download).await?;
}
downloads.end().await?;
let mut playtimes = client.insert("playtime")?;
for playtime in playtime_queue {
playtimes.write(&playtime).await?;
}
playtimes.end().await?;
}
Ok(())
}
}

82
src/queue/maxmind.rs Normal file
View File

@@ -0,0 +1,82 @@
use flate2::read::GzDecoder;
use log::warn;
use maxminddb::geoip2::Country;
use std::io::{Cursor, Read};
use std::net::Ipv6Addr;
use tar::Archive;
use tokio::sync::RwLock;
pub struct MaxMindIndexer {
pub reader: RwLock<Option<maxminddb::Reader<Vec<u8>>>>,
}
impl MaxMindIndexer {
pub async fn new() -> Result<Self, reqwest::Error> {
let reader = MaxMindIndexer::inner_index(false).await.ok().flatten();
Ok(MaxMindIndexer {
reader: RwLock::new(reader),
})
}
pub async fn index(&self) -> Result<(), reqwest::Error> {
let reader = MaxMindIndexer::inner_index(false).await?;
if let Some(reader) = reader {
let mut reader_new = self.reader.write().await;
*reader_new = Some(reader);
}
Ok(())
}
async fn inner_index(
should_panic: bool,
) -> Result<Option<maxminddb::Reader<Vec<u8>>>, reqwest::Error> {
let response = reqwest::get(
format!(
"https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country&license_key={}&suffix=tar.gz",
dotenvy::var("MAXMIND_LICENSE_KEY").unwrap()
)
).await?.bytes().await.unwrap().to_vec();
let tarfile = GzDecoder::new(Cursor::new(response));
let mut archive = Archive::new(tarfile);
if let Ok(entries) = archive.entries() {
for mut file in entries.flatten() {
if let Ok(path) = file.header().path() {
if path.extension().and_then(|x| x.to_str()) == Some("mmdb") {
let mut buf = Vec::new();
file.read_to_end(&mut buf).unwrap();
let reader = maxminddb::Reader::from_source(buf).unwrap();
return Ok(Some(reader));
}
}
}
}
if should_panic {
panic!("Unable to download maxmind database- did you get a license key?")
} else {
warn!("Unable to download maxmind database.");
Ok(None)
}
}
pub async fn query(&self, ip: Ipv6Addr) -> Option<String> {
let maxmind = self.reader.read().await;
if let Some(ref maxmind) = *maxmind {
maxmind
.lookup::<Country>(ip.into())
.ok()
.and_then(|x| x.country.and_then(|x| x.iso_code.map(|x| x.to_string())))
} else {
None
}
}
}

View File

@@ -1,3 +1,6 @@
pub mod analytics;
pub mod download;
pub mod maxmind;
pub mod payouts;
pub mod session;
pub mod socket;

View File

@@ -1,9 +1,12 @@
use crate::models::projects::MonetizationStatus;
use crate::routes::ApiError;
use crate::util::env::parse_var;
use base64::Engine;
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Datelike, Duration, Utc, Weekday};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::PgPool;
use std::collections::HashMap;
pub struct PayoutsQueue {
@@ -197,3 +200,210 @@ impl PayoutsQueue {
Ok(Decimal::ZERO)
}
}
pub async fn process_payout(
pool: &PgPool,
redis: &deadpool_redis::Pool,
client: &clickhouse::Client,
) -> Result<(), ApiError> {
let start: DateTime<Utc> = DateTime::from_utc(
Utc::now()
.date_naive()
.and_hms_nano_opt(0, 0, 0, 0)
.unwrap_or_default(),
Utc,
);
let results = sqlx::query!(
"SELECT EXISTS(SELECT 1 FROM payouts_values WHERE created = $1)",
start,
)
.fetch_one(pool)
.await?;
if results.exists.unwrap_or(false) {
return Ok(());
}
let end = start + Duration::days(1);
#[derive(Deserialize, clickhouse::Row)]
struct ProjectMultiplier {
pub page_views: u64,
pub project_id: u64,
}
let (views_values, views_sum, downloads_values, downloads_sum) = futures::future::try_join4(
client
.query(
r#"
SELECT COUNT(id) page_views, project_id
FROM views
WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0)
GROUP BY project_id
ORDER BY page_views DESC
"#,
)
.bind(start.timestamp())
.bind(end.timestamp())
.fetch_all::<ProjectMultiplier>(),
client
.query("SELECT COUNT(id) FROM views WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0)")
.bind(start.timestamp())
.bind(end.timestamp())
.fetch_one::<u64>(),
client
.query(
r#"
SELECT COUNT(id) page_views, project_id
FROM downloads
WHERE (recorded BETWEEN ? AND ?) AND (user_id != 0)
GROUP BY project_id
ORDER BY page_views DESC
"#,
)
.bind(start.timestamp())
.bind(end.timestamp())
.fetch_all::<ProjectMultiplier>(),
client
.query("SELECT COUNT(id) FROM downloads WHERE (recorded BETWEEN ? AND ?) AND (user_id != 0)")
.bind(start.timestamp())
.bind(end.timestamp())
.fetch_one::<u64>(),
)
.await?;
let mut transaction = pool.begin().await?;
struct PayoutMultipliers {
sum: u64,
values: HashMap<u64, u64>,
}
let mut views_values = views_values
.into_iter()
.map(|x| (x.project_id, x.page_views))
.collect::<HashMap<u64, u64>>();
let downloads_values = downloads_values
.into_iter()
.map(|x| (x.project_id, x.page_views))
.collect::<HashMap<u64, u64>>();
views_values.extend(downloads_values);
let multipliers: PayoutMultipliers = PayoutMultipliers {
sum: downloads_sum + views_sum,
values: views_values,
};
struct Project {
// user_id, payouts_split
team_members: Vec<(i64, Decimal)>,
}
let mut projects_map: HashMap<i64, Project> = HashMap::new();
use futures::TryStreamExt;
sqlx::query!(
"
SELECT m.id id, tm.user_id user_id, tm.payouts_split payouts_split
FROM mods m
INNER JOIN team_members tm on m.team_id = tm.team_id AND tm.accepted = TRUE
WHERE m.id = ANY($1) AND m.monetization_status = $2
",
&multipliers
.values
.keys()
.map(|x| *x as i64)
.collect::<Vec<i64>>(),
MonetizationStatus::Monetized.as_str(),
)
.fetch_many(&mut *transaction)
.try_for_each(|e| {
if let Some(row) = e.right() {
if let Some(project) = projects_map.get_mut(&row.id) {
project.team_members.push((row.user_id, row.payouts_split));
} else {
projects_map.insert(
row.id,
Project {
team_members: vec![(row.user_id, row.payouts_split)],
},
);
}
}
futures::future::ready(Ok(()))
})
.await?;
let amount = Decimal::from(parse_var::<u64>("PAYOUTS_BUDGET").unwrap_or(0));
let days = Decimal::from(28);
let weekdays = Decimal::from(20);
let weekend_bonus = Decimal::from(5) / Decimal::from(4);
let weekday_amount = amount / (weekdays + (weekend_bonus) * (days - weekdays));
let weekend_amount = weekday_amount * weekend_bonus;
let payout = match start.weekday() {
Weekday::Sat | Weekday::Sun => weekend_amount,
_ => weekday_amount,
};
for (id, project) in projects_map {
if let Some(value) = &multipliers.values.get(&(id as u64)) {
let project_multiplier: Decimal =
Decimal::from(**value) / Decimal::from(multipliers.sum);
let sum_splits: Decimal = project.team_members.iter().map(|x| x.1).sum();
let mut clear_cache_users = Vec::new();
if sum_splits > Decimal::ZERO {
for (user_id, split) in project.team_members {
let payout: Decimal = payout * project_multiplier * (split / sum_splits);
if payout > Decimal::ZERO {
sqlx::query!(
"
INSERT INTO payouts_values (user_id, mod_id, amount, created)
VALUES ($1, $2, $3, $4)
",
user_id,
id,
payout,
start
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
UPDATE users
SET balance = balance + $1
WHERE id = $2
",
payout,
user_id
)
.execute(&mut *transaction)
.await?;
clear_cache_users.push(user_id);
}
}
}
crate::database::models::User::clear_caches(
&clear_cache_users
.into_iter()
.map(|x| (crate::database::models::UserId(x), None))
.collect::<Vec<_>>(),
redis,
)
.await?;
}
}
transaction.commit().await?;
Ok(())
}

15
src/queue/socket.rs Normal file
View File

@@ -0,0 +1,15 @@
//! "Database" for Hydra
use actix_ws::Session;
use dashmap::DashMap;
pub struct ActiveSockets {
pub auth_sockets: DashMap<String, Session>,
}
impl Default for ActiveSockets {
fn default() -> Self {
Self {
auth_sockets: DashMap::new(),
}
}
}