Merge commit '74cf3f076eff43755bb4bef62f1c1bb3fc0e6c2a' into feature-clean

This commit is contained in:
2025-05-26 17:59:09 +03:00
497 changed files with 15033 additions and 9421 deletions

View File

@@ -0,0 +1,122 @@
use crate::worlds::{DisplayStatus, WorldType};
use paste::paste;
use std::collections::HashMap;
#[derive(Debug, Clone, Default)]
pub struct AttachedWorldData {
pub display_status: DisplayStatus,
}
impl AttachedWorldData {
pub async fn get_for_world(
instance: &str,
world_type: WorldType,
world_id: &str,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<Option<Self>> {
let world_type = world_type.as_str();
let attached_data = sqlx::query!(
"
SELECT display_status
FROM attached_world_data
WHERE profile_path = $1 and world_type = $2 and world_id = $3
",
instance,
world_type,
world_id
)
.fetch_optional(exec)
.await?;
Ok(attached_data.map(|x| AttachedWorldData {
display_status: DisplayStatus::from_string(&x.display_status),
}))
}
pub async fn get_all_for_instance(
instance: &str,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<HashMap<(WorldType, String), Self>> {
let attached_data = sqlx::query!(
"
SELECT world_type, world_id, display_status
FROM attached_world_data
WHERE profile_path = $1
",
instance
)
.fetch_all(exec)
.await?;
Ok(attached_data
.into_iter()
.map(|x| {
let world_type = WorldType::from_string(&x.world_type);
let display_status =
DisplayStatus::from_string(&x.display_status);
(
(world_type, x.world_id),
AttachedWorldData { display_status },
)
})
.collect())
}
pub async fn remove_for_world(
instance: &str,
world_type: WorldType,
world_id: &str,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<()> {
let world_type = world_type.as_str();
sqlx::query!(
"
DELETE FROM attached_world_data
WHERE profile_path = $1 and world_type = $2 and world_id = $3
",
instance,
world_type,
world_id
)
.execute(exec)
.await?;
Ok(())
}
}
macro_rules! attached_data_setter {
($parameter:ident: $parameter_type:ty, $column:expr $(=> $adapter:expr)?) => {
paste! {
pub async fn [<set_ $parameter>](
instance: &str,
world_type: WorldType,
world_id: &str,
$parameter: $parameter_type,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<()> {
let world_type = world_type.as_str();
$(let $parameter = $adapter;)?
sqlx::query!(
"INSERT INTO attached_world_data (profile_path, world_type, world_id, " + $column + ")\n" +
"VALUES ($1, $2, $3, $4)\n" +
"ON CONFLICT (profile_path, world_type, world_id) DO UPDATE\n" +
" SET " + $column + " = $4",
instance,
world_type,
world_id,
$parameter
)
.execute(exec)
.await?;
Ok(())
}
}
}
}
attached_data_setter!(display_status: DisplayStatus, "display_status" => display_status.as_str());

View File

@@ -1,6 +1,6 @@
use crate::config::{META_URL, MODRINTH_API_URL, MODRINTH_API_URL_V3};
use crate::state::ProjectType;
use crate::util::fetch::{fetch_json, sha1_async, FetchSemaphore};
use crate::util::fetch::{FetchSemaphore, fetch_json, sha1_async};
use chrono::{DateTime, Utc};
use dashmap::DashSet;
use reqwest::Method;
@@ -843,7 +843,7 @@ impl CachedEntry {
fetch_semaphore: &FetchSemaphore,
pool: &SqlitePool,
) -> crate::Result<Vec<T>> {
const MAX_REQUEST_SIZE: usize = 1000;
const MAX_REQUEST_SIZE: usize = 800;
let urls = keys
.iter()
@@ -1072,7 +1072,7 @@ impl CachedEntry {
CacheValueType::File => {
let mut versions = fetch_json::<HashMap<String, Version>>(
Method::POST,
&format!("{}version_files", MODRINTH_API_URL),
&format!("{MODRINTH_API_URL}version_files"),
None,
Some(serde_json::json!({
"algorithm": "sha1",
@@ -1285,7 +1285,7 @@ impl CachedEntry {
if let Some(values) =
filtered_keys.iter_mut().find(|x| {
x.0 .0 == loaders_key && x.0 .1 == game_version
x.0.0 == loaders_key && x.0.1 == game_version
})
{
values.1.push(hash.to_string());
@@ -1307,7 +1307,7 @@ impl CachedEntry {
});
let version_update_url =
format!("{}version_files/update", MODRINTH_API_URL);
format!("{MODRINTH_API_URL}version_files/update");
let variations =
futures::future::try_join_all(filtered_keys.iter().map(
|((loaders_key, game_version), hashes)| {
@@ -1481,7 +1481,7 @@ pub async fn cache_file_hash(
CachedEntry::upsert_many(
&[CacheValue::FileHash(CachedFileHash {
path: format!("{}/{}", profile_path, path),
path: format!("{profile_path}/{path}"),
size: size as u64,
hash,
project_type,

View File

@@ -1,8 +1,8 @@
//! Theseus directory information
use crate::LoadingBarType;
use crate::event::emit::{emit_loading, init_loading};
use crate::state::{JavaVersion, Profile, Settings};
use crate::util::fetch::IoSemaphore;
use crate::LoadingBarType;
use dashmap::DashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;

View File

@@ -1,17 +1,16 @@
use std::{
sync::{atomic::AtomicBool, Arc},
time::{SystemTime, UNIX_EPOCH},
time::{SystemTime, UNIX_EPOCH}, // AstralRinth
};
use discord_rich_presence::{
activity::{Activity, Assets, Timestamps},
activity::{Activity, Assets, Timestamps}, // AstralRinth
DiscordIpc, DiscordIpcClient,
};
use rand::seq::SliceRandom;
use rand::seq::SliceRandom; // AstralRinth
use tokio::sync::RwLock;
// use crate::state::Profile;
use crate::util::utils;
use crate::util::utils; // AstralRinth
use crate::State;
pub struct DiscordGuard {
@@ -43,8 +42,7 @@ impl DiscordGuard {
let dipc =
DiscordIpcClient::new("1190718475832918136").map_err(|e| {
crate::ErrorKind::OtherError(format!(
"Could not create Discord client {}",
e,
"Could not create Discord client {e}",
))
})?;
@@ -133,8 +131,7 @@ impl DiscordGuard {
let res = client.set_activity(activity.clone());
let could_not_set_err = |e: Box<dyn serde::ser::StdError>| {
crate::ErrorKind::OtherError(format!(
"Could not update Discord activity {}",
e,
"Could not update Discord activity {e}",
))
};
@@ -142,8 +139,7 @@ impl DiscordGuard {
if let Err(_e) = res {
client.reconnect().map_err(|e| {
crate::ErrorKind::OtherError(format!(
"Could not reconnect to Discord IPC {}",
e,
"Could not reconnect to Discord IPC {e}",
))
})?;
return Ok(client
@@ -174,8 +170,7 @@ impl DiscordGuard {
let could_not_clear_err = |e: Box<dyn serde::ser::StdError>| {
crate::ErrorKind::OtherError(format!(
"Could not clear Discord activity {}",
e,
"Could not clear Discord activity {e}",
))
};
@@ -183,8 +178,7 @@ impl DiscordGuard {
if res.is_err() {
client.reconnect().map_err(|e| {
crate::ErrorKind::OtherError(format!(
"Could not reconnect to Discord IPC {}",
e,
"Could not reconnect to Discord IPC {e}",
))
})?;
return Ok(client

View File

@@ -1,32 +1,34 @@
use crate::config::{MODRINTH_API_URL_V3, MODRINTH_SOCKET_URL};
use crate::data::ModrinthCredentials;
use crate::event::emit::emit_friend;
use crate::event::FriendPayload;
use crate::event::emit::emit_friend;
use crate::state::tunnel::InternalTunnelSocket;
use crate::state::{ProcessManager, Profile, TunnelSocket};
use crate::util::fetch::{fetch_advanced, fetch_json, FetchSemaphore};
use crate::util::fetch::{FetchSemaphore, fetch_advanced, fetch_json};
use ariadne::ids::UserId;
use ariadne::networking::message::{
ClientToServerMessage, ServerToClientMessage,
};
use ariadne::users::{UserId, UserStatus};
use async_tungstenite::tokio::{connect_async, ConnectStream};
use async_tungstenite::tungstenite::client::IntoClientRequest;
use async_tungstenite::tungstenite::Message;
use ariadne::users::UserStatus;
use async_tungstenite::WebSocketStream;
use async_tungstenite::tokio::{ConnectStream, connect_async};
use async_tungstenite::tungstenite::Message;
use async_tungstenite::tungstenite::client::IntoClientRequest;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use either::Either;
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt};
use reqwest::header::HeaderValue;
use reqwest::Method;
use reqwest::header::HeaderValue;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::OwnedReadHalf;
use tokio::net::TcpStream;
use tokio::net::tcp::OwnedReadHalf;
use tokio::sync::{Mutex, RwLock};
use uuid::Uuid;
@@ -204,7 +206,10 @@ impl FriendsSocket {
}
}
Err(e) => {
tracing::error!("Error handling message from websocket server: {:?}", e);
tracing::error!(
"Error handling message from websocket server: {:?}",
e
);
}
}
}
@@ -258,7 +263,7 @@ impl FriendsSocket {
last_ping = Utc::now();
let mut write = state.friends_socket.write.write().await;
if let Some(write) = write.as_mut() {
let _ = write.send(Message::Ping(Vec::new())).await;
let _ = write.send(Message::Ping(Bytes::new())).await;
}
}

View File

@@ -1,30 +1,31 @@
use crate::event::emit::{emit_profile, emit_warning};
use crate::State;
use crate::event::ProfilePayloadType;
use crate::state::{DirectoryInfo, ProfileInstallStage, ProjectType};
use futures::{channel::mpsc::channel, SinkExt, StreamExt};
use crate::event::emit::{emit_profile, emit_warning};
use crate::state::{
DirectoryInfo, ProfileInstallStage, ProjectType, attached_world_data,
};
use crate::worlds::WorldType;
use notify::{RecommendedWatcher, RecursiveMode};
use notify_debouncer_mini::{new_debouncer, DebounceEventResult, Debouncer};
use notify_debouncer_mini::{DebounceEventResult, Debouncer, new_debouncer};
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::sync::{RwLock, mpsc::channel};
pub type FileWatcher = RwLock<Debouncer<RecommendedWatcher>>;
pub async fn init_watcher() -> crate::Result<FileWatcher> {
let (mut tx, mut rx) = channel(1);
let (tx, mut rx) = channel(1);
let file_watcher = new_debouncer(
Duration::from_secs_f32(1.0),
move |res: DebounceEventResult| {
futures::executor::block_on(async {
tx.send(res).await.unwrap();
})
tx.blocking_send(res).ok();
},
)?;
tokio::task::spawn(async move {
let span = tracing::span!(tracing::Level::INFO, "init_watcher");
tracing::info!(parent: &span, "Initting watcher");
while let Some(res) = rx.next().await {
while let Some(res) = rx.recv().await {
let _span = span.enter();
match res {
@@ -37,9 +38,7 @@ pub async fn init_watcher() -> crate::Result<FileWatcher> {
let mut found = false;
for component in e.path.components() {
if found {
profile_path = Some(
component.as_os_str().to_string_lossy(),
);
profile_path = Some(component.as_os_str());
break;
}
@@ -51,26 +50,87 @@ pub async fn init_watcher() -> crate::Result<FileWatcher> {
}
if let Some(profile_path) = profile_path {
if e.path
let profile_path_str =
profile_path.to_string_lossy().to_string();
let first_file_name = e
.path
.components()
.any(|x| x.as_os_str() == "crash-reports")
.skip_while(|x| x.as_os_str() != profile_path)
.nth(1)
.map(|x| x.as_os_str());
if first_file_name
.filter(|x| *x == "crash-reports")
.is_some()
&& e.path
.extension()
.map(|x| x == "txt")
.unwrap_or(false)
.filter(|x| *x == "txt")
.is_some()
{
crash_task(profile_path.to_string());
crash_task(profile_path_str);
} else if !visited_profiles.contains(&profile_path)
{
let path = profile_path.to_string();
tokio::spawn(async move {
let _ = emit_profile(
&path,
ProfilePayloadType::Synced,
)
.await;
});
visited_profiles.push(profile_path);
let event = if first_file_name
.filter(|x| *x == "servers.dat")
.is_some()
{
Some(ProfilePayloadType::ServersUpdated)
} else if first_file_name
.filter(|x| {
*x == "saves"
&& e.path
.file_name()
.filter(|x| *x == "level.dat")
.is_some()
})
.is_some()
{
tracing::info!(
"World updated: {}",
e.path.display()
);
let world = e
.path
.parent()
.unwrap()
.file_name()
.unwrap()
.to_string_lossy()
.to_string();
if !e.path.is_file() {
let profile_path_str = profile_path_str.clone();
let world = world.clone();
tokio::spawn(async move {
if let Ok(state) = State::get().await {
if let Err(e) = attached_world_data::AttachedWorldData::remove_for_world(
&profile_path_str,
WorldType::Singleplayer,
&world,
&state.pool
).await {
tracing::warn!("Failed to remove AttachedWorldData for '{world}': {e}")
}
}
});
}
Some(ProfilePayloadType::WorldUpdated { world })
} else if first_file_name
.filter(|x| *x == "saves")
.is_none()
{
Some(ProfilePayloadType::Synced)
} else {
None
};
if let Some(event) = event {
tokio::spawn(async move {
let _ = emit_profile(
&profile_path_str,
event,
)
.await;
});
visited_profiles.push(profile_path);
}
}
}
});
@@ -111,27 +171,47 @@ pub(crate) async fn watch_profile(
let profile_path = dirs.profiles_dir().join(profile_path);
if profile_path.exists() && profile_path.is_dir() {
for folder in ProjectType::iterator()
.map(|x| x.get_folder())
.chain(["crash-reports"])
{
let path = profile_path.join(folder);
for sub_path in ProjectType::iterator().map(|x| x.get_folder()).chain([
"crash-reports",
"saves",
"servers.dat",
]) {
let full_path = profile_path.join(sub_path);
if !path.exists() && !path.is_symlink() {
if let Err(e) = crate::util::io::create_dir_all(&path).await {
tracing::error!(
"Failed to create directory for watcher {path:?}: {e}"
);
return;
if !full_path.exists() && !full_path.is_symlink() {
if !sub_path.contains(".") {
if let Err(e) =
crate::util::io::create_dir_all(&full_path).await
{
tracing::error!(
"Failed to create directory for watcher {full_path:?}: {e}"
);
return;
}
} else if sub_path == "servers.dat" {
const EMPTY_NBT: &[u8] = &[
10, // Compound tag
0, 0, // Empty name
0, // End of compound tag
];
if let Err(e) =
crate::util::io::write(&full_path, EMPTY_NBT).await
{
tracing::error!(
"Failed to create file for watcher {full_path:?}: {e}"
);
return;
}
}
}
let mut watcher = watcher.write().await;
if let Err(e) =
watcher.watcher().watch(&path, RecursiveMode::Recursive)
if let Err(e) = watcher
.watcher()
.watch(&full_path, RecursiveMode::Recursive)
{
tracing::error!(
"Failed to watch directory for watcher {path:?}: {e}"
"Failed to watch directory for watcher {full_path:?}: {e}"
);
return;
}

View File

@@ -5,11 +5,11 @@ use crate::state;
use crate::state::{
CacheValue, CachedEntry, CachedFile, CachedFileHash, CachedFileUpdate,
Credentials, DefaultPage, DependencyType, DeviceToken, DeviceTokenKey,
DeviceTokenPair, FileType, Hooks, LinkedData, MemorySettings,
ModrinthCredentials, Profile, ProfileInstallStage, TeamMember, Theme,
VersionFile, WindowSize,
DeviceTokenPair, FileType, Hooks, LauncherFeatureVersion, LinkedData,
MemorySettings, ModrinthCredentials, Profile, ProfileInstallStage,
TeamMember, Theme, VersionFile, WindowSize,
};
use crate::util::fetch::{read_json, IoSemaphore};
use crate::util::fetch::{IoSemaphore, read_json};
use chrono::{DateTime, Utc};
use p256::ecdsa::SigningKey;
use p256::pkcs8::DecodePrivateKey;
@@ -250,9 +250,11 @@ where
.metadata
.game_version
.clone(),
loaders: vec![mod_loader
.as_str()
.to_string()],
loaders: vec![
mod_loader
.as_str()
.to_string(),
],
update_version_id:
update_version.id.clone(),
},
@@ -317,9 +319,11 @@ where
ProfileInstallStage::NotInstalled
}
},
launcher_feature_version: LauncherFeatureVersion::None,
name: profile.metadata.name,
icon_path: profile.metadata.icon,
game_version: profile.metadata.game_version,
protocol_version: None,
loader: profile.metadata.loader.into(),
loader_version: profile
.metadata

View File

@@ -1,18 +1,17 @@
use crate::util::fetch::REQWEST_CLIENT;
use crate::ErrorKind;
use base64::prelude::{BASE64_STANDARD, BASE64_URL_SAFE_NO_PAD};
use crate::util::fetch::REQWEST_CLIENT;
use base64::Engine;
use byteorder::BigEndian;
use base64::prelude::{BASE64_STANDARD, BASE64_URL_SAFE_NO_PAD};
use chrono::{DateTime, Duration, TimeZone, Utc};
use dashmap::DashMap;
use futures::TryStreamExt;
use p256::ecdsa::signature::Signer;
use p256::ecdsa::{Signature, SigningKey, VerifyingKey};
use p256::pkcs8::{DecodePrivateKey, EncodePrivateKey, LineEnding};
use rand::rngs::OsRng;
use rand::Rng;
use reqwest::header::HeaderMap;
use rand::rngs::OsRng;
use reqwest::Response;
use reqwest::header::HeaderMap;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::json;
@@ -62,12 +61,6 @@ pub enum MinecraftAuthenticationError {
#[source]
source: reqwest::Error,
},
#[error("Error creating signed request buffer {step:?}: {source}")]
ConstructingSignedRequest {
step: MinecraftAuthStep,
#[source]
source: std::io::Error,
},
#[error("Error reading XBOX Session ID header")]
NoSessionId,
#[error("Error reading user hash")]
@@ -1110,56 +1103,25 @@ async fn send_signed_request<T: DeserializeOwned>(
let time: u128 =
{ ((current_date.timestamp() as u128) + 11644473600) * 10000000 };
use byteorder::WriteBytesExt;
let mut buffer = Vec::new();
buffer.write_u32::<BigEndian>(1).map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest { source, step }
})?;
buffer.write_u8(0).map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest { source, step }
})?;
buffer
.write_u64::<BigEndian>(time as u64)
.map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest {
source,
step,
}
})?;
buffer.write_u8(0).map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest { source, step }
})?;
buffer.extend_from_slice(&1_u32.to_be_bytes()[..]);
buffer.push(0_u8);
buffer.extend_from_slice(&(time as u64).to_be_bytes()[..]);
buffer.push(0_u8);
buffer.extend_from_slice("POST".as_bytes());
buffer.write_u8(0).map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest { source, step }
})?;
buffer.push(0_u8);
buffer.extend_from_slice(url_path.as_bytes());
buffer.write_u8(0).map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest { source, step }
})?;
buffer.push(0_u8);
buffer.extend_from_slice(&auth);
buffer.write_u8(0).map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest { source, step }
})?;
buffer.push(0_u8);
buffer.extend_from_slice(&body);
buffer.write_u8(0).map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest { source, step }
})?;
buffer.push(0_u8);
let ecdsa_sig: Signature = key.key.sign(&buffer);
let mut sig_buffer = Vec::new();
sig_buffer.write_i32::<BigEndian>(1).map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest { source, step }
})?;
sig_buffer
.write_u64::<BigEndian>(time as u64)
.map_err(|source| {
MinecraftAuthenticationError::ConstructingSignedRequest {
source,
step,
}
})?;
sig_buffer.extend_from_slice(&1_i32.to_be_bytes()[..]);
sig_buffer.extend_from_slice(&(time as u64).to_be_bytes()[..]);
sig_buffer.extend_from_slice(&ecdsa_sig.r().to_bytes());
sig_buffer.extend_from_slice(&ecdsa_sig.s().to_bytes());
@@ -1224,6 +1186,6 @@ fn get_date_header(headers: &HeaderMap) -> DateTime<Utc> {
fn generate_oauth_challenge() -> String {
let mut rng = rand::thread_rng();
let bytes: Vec<u8> = (0..64).map(|_| rng.gen::<u8>()).collect();
bytes.iter().map(|byte| format!("{:02x}", byte)).collect()
let bytes: Vec<u8> = (0..64).map(|_| rng.r#gen::<u8>()).collect();
bytes.iter().map(|byte| format!("{byte:02x}")).collect()
}

View File

@@ -45,6 +45,9 @@ pub use self::mr_auth::*;
mod legacy_converter;
pub mod attached_world_data;
pub mod server_join_log;
// Global state
// RwLock on state only has concurrent reads, except for config dir change which takes control of the State
static LAUNCHER_STATE: OnceCell<Arc<State>> = OnceCell::const_new();
@@ -108,7 +111,9 @@ impl State {
/// Get the current launcher state, waiting for initialization
pub async fn get() -> crate::Result<Arc<Self>> {
if !LAUNCHER_STATE.initialized() {
tracing::error!("Attempted to get state before it is initialized - this should never happen!");
tracing::error!(
"Attempted to get state before it is initialized - this should never happen!"
);
while !LAUNCHER_STATE.initialized() {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

View File

@@ -1,6 +1,6 @@
use crate::config::{MODRINTH_API_URL, MODRINTH_URL};
use crate::state::{CacheBehaviour, CachedEntry};
use crate::util::fetch::{fetch_advanced, FetchSemaphore};
use crate::util::fetch::{FetchSemaphore, fetch_advanced};
use chrono::{DateTime, Duration, TimeZone, Utc};
use dashmap::DashMap;
use futures::TryStreamExt;

View File

@@ -1,15 +1,23 @@
use crate::event::emit::emit_process;
use crate::event::ProcessPayloadType;
use crate::event::emit::{emit_process, emit_profile};
use crate::event::{ProcessPayloadType, ProfilePayloadType};
use crate::profile;
use crate::util::io::IOError;
use chrono::{DateTime, Utc};
use chrono::{DateTime, TimeZone, Utc};
use dashmap::DashMap;
use quick_xml::Reader;
use quick_xml::events::Event;
use serde::Deserialize;
use serde::Serialize;
use std::fs::OpenOptions;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::ExitStatus;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use uuid::Uuid;
const LAUNCHER_LOG_PATH: &str = "launcher_log.txt";
pub struct ProcessManager {
processes: DashMap<Uuid, Process>,
}
@@ -32,8 +40,16 @@ impl ProcessManager {
profile_path: &str,
mut mc_command: Command,
post_exit_command: Option<String>,
logs_folder: PathBuf,
xml_logging: bool,
) -> crate::Result<ProcessMetadata> {
let mc_proc = mc_command.spawn().map_err(IOError::from)?;
mc_command.stdout(std::process::Stdio::piped());
mc_command.stderr(std::process::Stdio::piped());
let mut mc_proc = mc_command.spawn().map_err(IOError::from)?;
let stdout = mc_proc.stdout.take();
let stderr = mc_proc.stderr.take();
let process = Process {
metadata: ProcessMetadata {
@@ -46,6 +62,65 @@ impl ProcessManager {
let metadata = process.metadata.clone();
if !logs_folder.exists() {
tokio::fs::create_dir_all(&logs_folder)
.await
.map_err(|e| IOError::with_path(e, &logs_folder))?;
}
let log_path = logs_folder.join(LAUNCHER_LOG_PATH);
{
let mut log_file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&log_path)
.map_err(|e| IOError::with_path(e, &log_path))?;
// Initialize with timestamp header
let now = chrono::Local::now();
writeln!(
log_file,
"# Minecraft launcher log started at {}",
now.format("%Y-%m-%d %H:%M:%S")
)
.map_err(|e| IOError::with_path(e, &log_path))?;
writeln!(log_file, "# Profile: {profile_path} \n")
.map_err(|e| IOError::with_path(e, &log_path))?;
writeln!(log_file).map_err(|e| IOError::with_path(e, &log_path))?;
}
if let Some(stdout) = stdout {
let log_path_clone = log_path.clone();
let profile_path = metadata.profile_path.clone();
tokio::spawn(async move {
Process::process_output(
&profile_path,
stdout,
log_path_clone,
xml_logging,
)
.await;
});
}
if let Some(stderr) = stderr {
let log_path_clone = log_path.clone();
let profile_path = metadata.profile_path.clone();
tokio::spawn(async move {
Process::process_output(
&profile_path,
stderr,
log_path_clone,
xml_logging,
)
.await;
});
}
tokio::spawn(Process::sequential_process_manager(
profile_path.to_string(),
post_exit_command,
@@ -120,7 +195,384 @@ struct Process {
child: Child,
}
#[derive(Debug, Default)]
struct Log4jEvent {
timestamp: Option<String>,
logger: Option<String>,
level: Option<String>,
thread: Option<String>,
message: Option<String>,
}
impl Process {
async fn process_output<R>(
profile_path: &str,
reader: R,
log_path: impl AsRef<Path>,
xml_logging: bool,
) where
R: tokio::io::AsyncRead + Unpin,
{
let mut buf_reader = BufReader::new(reader);
if xml_logging {
let mut reader = Reader::from_reader(buf_reader);
reader.config_mut().enable_all_checks(false);
let mut buf = Vec::new();
let mut current_event = Log4jEvent::default();
let mut in_event = false;
let mut in_message = false;
let mut in_throwable = false;
let mut current_content = String::new();
loop {
match reader.read_event_into_async(&mut buf).await {
Err(e) => {
tracing::error!(
"Error at position {}: {:?}",
reader.buffer_position(),
e
);
break;
}
// exits the loop when reaching end of file
Ok(Event::Eof) => break,
Ok(Event::Start(e)) => {
match e.name().as_ref() {
b"log4j:Event" => {
// Reset for new event
current_event = Log4jEvent::default();
in_event = true;
// Extract attributes
for attr in e.attributes().flatten() {
let key = String::from_utf8_lossy(
attr.key.into_inner(),
)
.to_string();
let value =
String::from_utf8_lossy(&attr.value)
.to_string();
match key.as_str() {
"logger" => {
current_event.logger = Some(value)
}
"level" => {
current_event.level = Some(value)
}
"thread" => {
current_event.thread = Some(value)
}
"timestamp" => {
current_event.timestamp =
Some(value)
}
_ => {}
}
}
}
b"log4j:Message" => {
in_message = true;
current_content = String::new();
}
b"log4j:Throwable" => {
in_throwable = true;
current_content = String::new();
}
_ => {}
}
}
Ok(Event::End(e)) => {
match e.name().as_ref() {
b"log4j:Message" => {
in_message = false;
current_event.message =
Some(current_content.clone());
}
b"log4j:Throwable" => {
in_throwable = false;
// Process and write the log entry
let thread = current_event
.thread
.as_deref()
.unwrap_or("");
let level = current_event
.level
.as_deref()
.unwrap_or("");
let logger = current_event
.logger
.as_deref()
.unwrap_or("");
if let Some(message) = &current_event.message {
let formatted_time =
Process::format_timestamp(
current_event.timestamp.as_deref(),
);
let formatted_log = format!(
"{} [{}] [{}{}]: {}\n",
formatted_time,
thread,
if !logger.is_empty() {
format!("{logger}/")
} else {
String::new()
},
level,
message.trim()
);
// Write the log message
if let Err(e) = Process::append_to_log_file(
&log_path,
&formatted_log,
) {
tracing::error!(
"Failed to write to log file: {}",
e
);
}
// Write the throwable if present
if !current_content.is_empty() {
if let Err(e) =
Process::append_to_log_file(
&log_path,
&current_content,
)
{
tracing::error!(
"Failed to write throwable to log file: {}",
e
);
}
}
}
}
b"log4j:Event" => {
in_event = false;
// If no throwable was present, write the log entry at the end of the event
if current_event.message.is_some()
&& !in_throwable
{
let thread = current_event
.thread
.as_deref()
.unwrap_or("");
let level = current_event
.level
.as_deref()
.unwrap_or("");
let logger = current_event
.logger
.as_deref()
.unwrap_or("");
let message = current_event
.message
.as_deref()
.unwrap_or("")
.trim();
let formatted_time =
Process::format_timestamp(
current_event.timestamp.as_deref(),
);
let formatted_log = format!(
"{} [{}] [{}{}]: {}\n",
formatted_time,
thread,
if !logger.is_empty() {
format!("{logger}/")
} else {
String::new()
},
level,
message
);
// Write the log message
if let Err(e) = Process::append_to_log_file(
&log_path,
&formatted_log,
) {
tracing::error!(
"Failed to write to log file: {}",
e
);
}
if let Some(timestamp) =
current_event.timestamp.as_deref()
{
if let Err(e) = Self::maybe_handle_server_join_logging(
profile_path,
timestamp,
message
).await {
tracing::error!("Failed to handle server join logging: {e}");
}
}
}
}
_ => {}
}
}
Ok(Event::Text(mut e)) => {
if in_message || in_throwable {
if let Ok(text) = e.unescape() {
current_content.push_str(&text);
}
} else if !in_event
&& !e.inplace_trim_end()
&& !e.inplace_trim_start()
{
if let Ok(text) = e.unescape() {
if let Err(e) = Process::append_to_log_file(
&log_path,
&format!("{text}\n"),
) {
tracing::error!(
"Failed to write to log file: {}",
e
);
}
}
}
}
Ok(Event::CData(e)) => {
if in_message || in_throwable {
if let Ok(text) = e
.escape()
.map_err(|x| x.into())
.and_then(|x| x.unescape())
{
current_content.push_str(&text);
}
}
}
_ => (),
}
buf.clear();
}
} else {
let mut line = String::new();
while let Ok(bytes_read) = buf_reader.read_line(&mut line).await {
if bytes_read == 0 {
break; // End of stream
}
if !line.is_empty() {
if let Err(e) = Self::append_to_log_file(&log_path, &line) {
tracing::warn!("Failed to write to log file: {}", e);
}
}
line.clear();
}
}
}
fn format_timestamp(timestamp: Option<&str>) -> String {
if let Some(timestamp_str) = timestamp {
if let Ok(timestamp_val) = timestamp_str.parse::<i64>() {
let datetime_utc = if timestamp_val > i32::MAX as i64 {
let secs = timestamp_val / 1000;
let nsecs = ((timestamp_val % 1000) * 1_000_000) as u32;
chrono::DateTime::<Utc>::from_timestamp(secs, nsecs)
.unwrap_or_default()
} else {
chrono::DateTime::<Utc>::from_timestamp(timestamp_val, 0)
.unwrap_or_default()
};
let datetime_local = datetime_utc.with_timezone(&chrono::Local);
format!("[{}]", datetime_local.format("%H:%M:%S"))
} else {
"[??:??:??]".to_string()
}
} else {
"[??:??:??]".to_string()
}
}
fn append_to_log_file(
path: impl AsRef<Path>,
line: &str,
) -> std::io::Result<()> {
let mut file =
OpenOptions::new().append(true).create(true).open(path)?;
file.write_all(line.as_bytes())?;
Ok(())
}
async fn maybe_handle_server_join_logging(
profile_path: &str,
timestamp: &str,
message: &str,
) -> crate::Result<()> {
let Some(host_port_string) = message.strip_prefix("Connecting to ")
else {
return Ok(());
};
let Some((host, port_string)) = host_port_string.rsplit_once(", ")
else {
return Ok(());
};
let Some(port) = port_string.parse::<u16>().ok() else {
return Ok(());
};
let timestamp = timestamp
.parse::<i64>()
.map(|x| x / 1000)
.map_err(|x| {
crate::ErrorKind::OtherError(format!(
"Failed to parse timestamp: {x}"
))
})
.and_then(|x| {
Utc.timestamp_opt(x, 0).single().ok_or_else(|| {
crate::ErrorKind::OtherError(
"Failed to convert timestamp to DateTime".to_string(),
)
})
})?;
let state = crate::State::get().await?;
crate::state::server_join_log::JoinLogEntry {
profile_path: profile_path.to_owned(),
host: host.to_string(),
port,
join_time: timestamp,
}
.upsert(&state.pool)
.await?;
{
let profile_path = profile_path.to_owned();
let host = host.to_owned();
tokio::spawn(async move {
let _ = emit_profile(
&profile_path,
ProfilePayloadType::ServerJoined {
host,
port,
timestamp,
},
)
.await;
});
}
Ok(())
}
// Spawns a new child process and inserts it into the hashmap
// Also, as the process ends, it spawns the follow-up process if it exists
// By convention, ExitStatus is last command's exit status, and we exit on the first non-zero exit status
@@ -204,6 +656,21 @@ impl Process {
}
});
let logs_folder = state.directories.profile_logs_dir(&profile_path);
let log_path = logs_folder.join(LAUNCHER_LOG_PATH);
if log_path.exists() {
if let Err(e) = Process::append_to_log_file(
&log_path,
&format!("\n# Process exited with status: {mc_exit_status}\n"),
) {
tracing::warn!(
"Failed to write exit status to log file: {}",
e
);
}
}
let _ = state.discord_rpc.clear_to_default(true).await;
let _ = state.friends_socket.update_status(None).await;

View File

@@ -1,28 +1,38 @@
use super::settings::{Hooks, MemorySettings, WindowSize};
use crate::profile::get_full_path;
use crate::state::server_join_log::JoinLogEntry;
use crate::state::{
cache_file_hash, CacheBehaviour, CachedEntry, CachedFileHash,
CacheBehaviour, CachedEntry, CachedFileHash, cache_file_hash,
};
use crate::util;
use crate::util::fetch::{write_cached_icon, FetchSemaphore, IoSemaphore};
use crate::util::fetch::{FetchSemaphore, IoSemaphore, write_cached_icon};
use crate::util::io::{self};
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, TimeDelta, TimeZone, Utc};
use dashmap::DashMap;
use regex::Regex;
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
use std::collections::HashSet;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::path::Path;
use std::sync::LazyLock;
use tokio::fs::DirEntry;
use tokio::io::{AsyncBufReadExt, AsyncRead};
use tokio::task::JoinSet;
// Represent a Minecraft instance.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Profile {
pub path: String,
pub install_stage: ProfileInstallStage,
pub launcher_feature_version: LauncherFeatureVersion,
pub name: String,
pub icon_path: Option<String>,
pub game_version: String,
pub protocol_version: Option<i32>,
pub loader: ModLoader,
pub loader_version: Option<String>,
@@ -86,6 +96,38 @@ impl ProfileInstallStage {
}
}
#[derive(
Serialize, Deserialize, Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd,
)]
#[serde(rename_all = "snake_case")]
pub enum LauncherFeatureVersion {
None,
MigratedServerLastPlayTime,
}
impl LauncherFeatureVersion {
pub const MOST_RECENT: Self = Self::MigratedServerLastPlayTime;
pub fn as_str(&self) -> &'static str {
match *self {
Self::None => "none",
Self::MigratedServerLastPlayTime => {
"migrated_server_last_play_time"
}
}
}
pub fn from_str(val: &str) -> Self {
match val {
"none" => Self::None,
"migrated_server_last_play_time" => {
Self::MigratedServerLastPlayTime
}
_ => Self::None,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct LinkedData {
pub project_id: String,
@@ -261,6 +303,8 @@ struct ProfileQueryResult {
override_hook_pre_launch: Option<String>,
override_hook_wrapper: Option<String>,
override_hook_post_exit: Option<String>,
protocol_version: Option<i64>,
launcher_feature_version: String,
}
impl TryFrom<ProfileQueryResult> for Profile {
@@ -270,9 +314,13 @@ impl TryFrom<ProfileQueryResult> for Profile {
Ok(Profile {
path: x.path,
install_stage: ProfileInstallStage::from_str(&x.install_stage),
launcher_feature_version: LauncherFeatureVersion::from_str(
&x.launcher_feature_version,
),
name: x.name,
icon_path: x.icon_path,
game_version: x.game_version,
protocol_version: x.protocol_version.map(|x| x as i32),
loader: ModLoader::from_string(&x.mod_loader),
loader_version: x.mod_loader_version,
groups: serde_json::from_value(x.groups).unwrap_or_default(),
@@ -336,8 +384,8 @@ macro_rules! select_profiles_with_predicate {
ProfileQueryResult,
r#"
SELECT
path, install_stage, name, icon_path,
game_version, mod_loader, mod_loader_version,
path, install_stage, launcher_feature_version, name, icon_path,
game_version, protocol_version, mod_loader, mod_loader_version,
json(groups) as "groups!: serde_json::Value",
linked_project_id, linked_version_id, locked,
created, modified, last_played,
@@ -399,6 +447,8 @@ impl Profile {
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<()> {
let install_stage = self.install_stage.as_str();
let launcher_feature_version = self.launcher_feature_version.as_str();
let mod_loader = self.loader.as_str();
let groups = serde_json::to_string(&self.groups)?;
@@ -435,7 +485,8 @@ impl Profile {
submitted_time_played, recent_time_played,
override_java_path, override_extra_launch_args, override_custom_env_vars,
override_mc_memory_max, override_mc_force_fullscreen, override_mc_game_resolution_x, override_mc_game_resolution_y,
override_hook_pre_launch, override_hook_wrapper, override_hook_post_exit
override_hook_pre_launch, override_hook_wrapper, override_hook_post_exit,
protocol_version, launcher_feature_version
)
VALUES (
$1, $2, $3, $4,
@@ -446,7 +497,8 @@ impl Profile {
$15, $16,
$17, jsonb($18), jsonb($19),
$20, $21, $22, $23,
$24, $25, $26
$24, $25, $26,
$27, $28
)
ON CONFLICT (path) DO UPDATE SET
install_stage = $2,
@@ -480,7 +532,10 @@ impl Profile {
override_hook_pre_launch = $24,
override_hook_wrapper = $25,
override_hook_post_exit = $26
override_hook_post_exit = $26,
protocol_version = $27,
launcher_feature_version = $28
",
self.path,
install_stage,
@@ -508,6 +563,8 @@ impl Profile {
self.hooks.pre_launch,
self.hooks.wrapper,
self.hooks.post_exit,
self.protocol_version,
launcher_feature_version
)
.execute(exec)
.await?;
@@ -557,10 +614,10 @@ impl Profile {
let mut all = Self::get_all(&state.pool).await?;
let mut keys = vec![];
let mut migrations = JoinSet::new();
for profile in &mut all {
let path =
crate::api::profile::get_full_path(&profile.path).await?;
let path = get_full_path(&profile.path).await?;
for project_type in ProjectType::iterator() {
let folder = project_type.get_folder();
@@ -602,7 +659,42 @@ impl Profile {
profile.install_stage = ProfileInstallStage::NotInstalled;
profile.upsert(&state.pool).await?;
}
if profile.launcher_feature_version
< LauncherFeatureVersion::MOST_RECENT
{
let state = state.clone();
let profile_path = profile.path.clone();
migrations.spawn(async move {
let Ok(Some(mut profile)) = Self::get(&profile_path, &state.pool).await else {
tracing::error!("Failed to find instance '{}' for migration", profile_path);
return;
};
drop(profile_path);
tracing::info!(
"Migrating profile '{}' from launcher feature version {:?} to {:?}",
profile.path, profile.launcher_feature_version, LauncherFeatureVersion::MOST_RECENT
);
loop {
let result = profile.perform_launcher_feature_migration(&state).await;
if result.is_err() || profile.launcher_feature_version == LauncherFeatureVersion::MOST_RECENT {
if let Err(err) = result {
tracing::error!("Failed to migrate instance '{}': {}", profile.path, err);
return;
}
if let Err(err) = profile.upsert(&state.pool).await {
tracing::error!("Failed to update instance '{}' migration state: {}", profile.path, err);
return;
}
break;
}
}
tracing::info!("Finished migration for profile '{}'", profile.path);
});
}
}
migrations.join_all().await;
let file_hashes = CachedEntry::get_file_hash_many(
&keys.iter().map(|s| &**s).collect::<Vec<_>>(),
@@ -643,6 +735,144 @@ impl Profile {
Ok(())
}
async fn perform_launcher_feature_migration(
&mut self,
state: &crate::State,
) -> crate::Result<()> {
match self.launcher_feature_version {
LauncherFeatureVersion::None => {
if self.last_played.is_none() {
self.launcher_feature_version =
LauncherFeatureVersion::MigratedServerLastPlayTime;
return Ok(());
}
let mut join_log_entry = JoinLogEntry {
profile_path: self.path.clone(),
..Default::default()
};
let logs_path = state.directories.profile_logs_dir(&self.path);
let Ok(mut directory) = io::read_dir(&logs_path).await else {
self.launcher_feature_version =
LauncherFeatureVersion::MigratedServerLastPlayTime;
return Ok(());
};
let existing_joins_map =
super::server_join_log::get_joins(&self.path, &state.pool)
.await?;
let existing_joins = existing_joins_map
.keys()
.map(|x| (&x.0 as &str, x.1))
.collect::<HashSet<_>>();
while let Some(log_file) = directory.next_entry().await? {
if let Err(err) = Self::parse_log_file(
&log_file,
|host, port| existing_joins.contains(&(host, port)),
state,
&mut join_log_entry,
)
.await
{
tracing::error!(
"Failed to parse log file '{}': {}",
log_file.path().display(),
err
);
}
}
self.launcher_feature_version =
LauncherFeatureVersion::MigratedServerLastPlayTime;
}
LauncherFeatureVersion::MOST_RECENT => unreachable!(
"LauncherFeatureVersion::MOST_RECENT was not updated"
),
}
Ok(())
}
// Parses a log file on a best-effort basis, using the log's creation time, rather than the
// actual times mentioned in the log file, which are missing date information.
async fn parse_log_file(
log_file: &DirEntry,
should_skip: impl Fn(&str, u16) -> bool,
state: &crate::State,
join_entry: &mut JoinLogEntry,
) -> crate::Result<()> {
let file_name = log_file.file_name();
let Some(file_name) = file_name.to_str() else {
return Ok(());
};
let log_time = io::metadata(&log_file.path()).await?.created()?.into();
if file_name == "latest.log" {
let file = io::open_file(&log_file.path()).await?;
Self::parse_open_log_file(
file,
should_skip,
log_time,
state,
join_entry,
)
.await
} else if file_name.ends_with(".log.gz") {
let file = io::open_file(&log_file.path()).await?;
let file = tokio::io::BufReader::new(file);
let file =
async_compression::tokio::bufread::GzipDecoder::new(file);
Self::parse_open_log_file(
file,
should_skip,
log_time,
state,
join_entry,
)
.await
} else {
Ok(())
}
}
async fn parse_open_log_file(
reader: impl AsyncRead + Unpin,
should_skip: impl Fn(&str, u16) -> bool,
mut log_time: DateTime<Utc>,
state: &crate::State,
join_entry: &mut JoinLogEntry,
) -> crate::Result<()> {
static LOG_LINE_REGEX: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"^\[[0-9]{2}(?::[0-9]{2}){2}] \[.+?/[A-Z]+?]: Connecting to (.+?), ([1-9][0-9]{0,4})$").unwrap()
});
let reader = tokio::io::BufReader::new(reader);
let mut lines = reader.lines();
while let Some(log_line) = lines.next_line().await? {
let Some(log_line) = LOG_LINE_REGEX.captures(&log_line) else {
continue;
};
let Some(host) = log_line.get(1) else {
continue;
};
let host = host.as_str();
let Some(port) = log_line.get(2) else {
continue;
};
let Ok(port) = port.as_str().parse::<u16>() else {
continue;
};
if should_skip(host, port) {
continue;
}
join_entry.host = host.to_string();
join_entry.port = port;
join_entry.join_time = log_time;
join_entry.upsert(&state.pool).await?;
log_time += TimeDelta::seconds(1);
}
Ok(())
}
pub async fn get_projects(
&self,
cache_behaviour: Option<CacheBehaviour>,

View File

@@ -0,0 +1,65 @@
use std::collections::HashMap;
use chrono::{DateTime, TimeZone, Utc};
#[derive(Default)]
pub struct JoinLogEntry {
pub profile_path: String,
pub host: String,
pub port: u16,
pub join_time: DateTime<Utc>,
}
impl JoinLogEntry {
pub async fn upsert(
&self,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<()> {
let join_time = self.join_time.timestamp();
sqlx::query!(
"
INSERT INTO join_log (profile_path, host, port, join_time)
VALUES ($1, $2, $3, $4)
ON CONFLICT (profile_path, host, port) DO UPDATE SET
join_time = $4
",
self.profile_path,
self.host,
self.port,
join_time
)
.execute(exec)
.await?;
Ok(())
}
}
pub async fn get_joins(
instance: &str,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<HashMap<(String, u16), DateTime<Utc>>> {
let joins = sqlx::query!(
"
SELECT profile_path, host, port, join_time
FROM join_log
WHERE profile_path = $1
",
instance
)
.fetch_all(exec)
.await?;
Ok(joins
.into_iter()
.map(|x| {
(
(x.host, x.port as u16),
Utc.timestamp_opt(x.join_time, 0)
.single()
.unwrap_or_else(Utc::now),
)
})
.collect())
}

View File

@@ -44,6 +44,8 @@ pub struct Settings {
pub enum FeatureFlag {
PagePath,
ProjectBackground,
WorldsTab,
WorldsInHome,
}
impl Settings {

View File

@@ -1,5 +1,5 @@
use crate::state::friends::{TunnelSockets, WriteSocket};
use crate::state::FriendsSocket;
use crate::state::friends::{TunnelSockets, WriteSocket};
use ariadne::networking::message::ClientToServerMessage;
use std::net::SocketAddr;
use std::sync::Arc;