You've already forked AstralRinth
forked from didirus/AstralRinth
Revert "Implement a more robust IPC system between the launcher and client (#4159)"
This reverts commit 5ffcc48d75.
This commit is contained in:
@@ -35,9 +35,6 @@ pub mod prelude {
|
||||
jre, metadata, minecraft_auth, mr_auth, pack, process,
|
||||
profile::{self, Profile, create},
|
||||
settings,
|
||||
util::{
|
||||
io::{IOError, canonicalize},
|
||||
network::tcp_listen_any_loopback,
|
||||
},
|
||||
util::io::{IOError, canonicalize},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -151,9 +151,6 @@ pub enum ErrorKind {
|
||||
"A skin texture must have a dimension of either 64x64 or 64x32 pixels"
|
||||
)]
|
||||
InvalidSkinTexture,
|
||||
|
||||
#[error("RPC error: {0}")]
|
||||
RpcError(String),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -16,7 +16,6 @@ use daedalus::{
|
||||
use dunce::canonicalize;
|
||||
use hashlink::LinkedHashSet;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::net::SocketAddr;
|
||||
use std::{collections::HashMap, path::Path};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -125,7 +124,6 @@ pub fn get_jvm_arguments(
|
||||
quick_play_type: &QuickPlayType,
|
||||
quick_play_version: QuickPlayVersion,
|
||||
log_config: Option<&LoggingConfiguration>,
|
||||
ipc_addr: SocketAddr,
|
||||
) -> crate::Result<Vec<String>> {
|
||||
let mut parsed_arguments = Vec::new();
|
||||
|
||||
@@ -183,11 +181,6 @@ pub fn get_jvm_arguments(
|
||||
.to_string_lossy()
|
||||
));
|
||||
|
||||
parsed_arguments
|
||||
.push(format!("-Dmodrinth.internal.ipc.host={}", ipc_addr.ip()));
|
||||
parsed_arguments
|
||||
.push(format!("-Dmodrinth.internal.ipc.port={}", ipc_addr.port()));
|
||||
|
||||
parsed_arguments.push(format!(
|
||||
"-Dmodrinth.internal.quickPlay.serverVersion={}",
|
||||
serde_json::to_value(quick_play_version.server)?
|
||||
|
||||
@@ -12,7 +12,6 @@ use crate::state::{
|
||||
Credentials, JavaVersion, ProcessMetadata, ProfileInstallStage,
|
||||
};
|
||||
use crate::util::io;
|
||||
use crate::util::rpc::RpcServerBuilder;
|
||||
use crate::{State, get_resource_file, process, state as st};
|
||||
use chrono::Utc;
|
||||
use daedalus as d;
|
||||
@@ -23,6 +22,7 @@ use serde::Deserialize;
|
||||
use st::Profile;
|
||||
use std::fmt::Write;
|
||||
use std::path::PathBuf;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::process::Command;
|
||||
|
||||
mod args;
|
||||
@@ -608,8 +608,6 @@ pub async fn launch_minecraft(
|
||||
let (main_class_keep_alive, main_class_path) =
|
||||
get_resource_file!(env "JAVA_JARS_DIR" / "theseus.jar")?;
|
||||
|
||||
let rpc_server = RpcServerBuilder::new().launch().await?;
|
||||
|
||||
command.args(
|
||||
args::get_jvm_arguments(
|
||||
args.get(&d::minecraft::ArgumentType::Jvm)
|
||||
@@ -635,7 +633,6 @@ pub async fn launch_minecraft(
|
||||
.logging
|
||||
.as_ref()
|
||||
.and_then(|x| x.get(&LoggingSide::Client)),
|
||||
rpc_server.address(),
|
||||
)?
|
||||
.into_iter(),
|
||||
);
|
||||
@@ -770,8 +767,7 @@ pub async fn launch_minecraft(
|
||||
state.directories.profile_logs_dir(&profile.path),
|
||||
version_info.logging.is_some(),
|
||||
main_class_keep_alive,
|
||||
rpc_server,
|
||||
async |process: &ProcessMetadata, rpc_server| {
|
||||
async |process: &ProcessMetadata, stdin| {
|
||||
let process_start_time = process.start_time.to_rfc3339();
|
||||
let profile_created_time = profile.created.to_rfc3339();
|
||||
let profile_modified_time = profile.modified.to_rfc3339();
|
||||
@@ -794,11 +790,14 @@ pub async fn launch_minecraft(
|
||||
let Some(value) = value else {
|
||||
continue;
|
||||
};
|
||||
rpc_server
|
||||
.call_method_2::<()>("set_system_property", key, value)
|
||||
.await?;
|
||||
stdin.write_all(b"property\t").await?;
|
||||
stdin.write_all(key.as_bytes()).await?;
|
||||
stdin.write_u8(b'\t').await?;
|
||||
stdin.write_all(value.as_bytes()).await?;
|
||||
stdin.write_u8(b'\n').await?;
|
||||
}
|
||||
rpc_server.call_method::<()>("launch").await?;
|
||||
stdin.write_all(b"launch\n").await?;
|
||||
stdin.flush().await?;
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
|
||||
@@ -2,7 +2,6 @@ use crate::event::emit::{emit_process, emit_profile};
|
||||
use crate::event::{ProcessPayloadType, ProfilePayloadType};
|
||||
use crate::profile;
|
||||
use crate::util::io::IOError;
|
||||
use crate::util::rpc::RpcServer;
|
||||
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
|
||||
use dashmap::DashMap;
|
||||
use quick_xml::Reader;
|
||||
@@ -16,7 +15,7 @@ use std::path::{Path, PathBuf};
|
||||
use std::process::ExitStatus;
|
||||
use tempfile::TempDir;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::process::{Child, ChildStdin, Command};
|
||||
use uuid::Uuid;
|
||||
|
||||
const LAUNCHER_LOG_PATH: &str = "launcher_log.txt";
|
||||
@@ -47,10 +46,9 @@ impl ProcessManager {
|
||||
logs_folder: PathBuf,
|
||||
xml_logging: bool,
|
||||
main_class_keep_alive: TempDir,
|
||||
rpc_server: RpcServer,
|
||||
post_process_init: impl AsyncFnOnce(
|
||||
&ProcessMetadata,
|
||||
&RpcServer,
|
||||
&mut ChildStdin,
|
||||
) -> crate::Result<()>,
|
||||
) -> crate::Result<ProcessMetadata> {
|
||||
mc_command.stdout(std::process::Stdio::piped());
|
||||
@@ -69,12 +67,14 @@ impl ProcessManager {
|
||||
profile_path: profile_path.to_string(),
|
||||
},
|
||||
child: mc_proc,
|
||||
rpc_server,
|
||||
_main_class_keep_alive: main_class_keep_alive,
|
||||
};
|
||||
|
||||
if let Err(e) =
|
||||
post_process_init(&process.metadata, &process.rpc_server).await
|
||||
if let Err(e) = post_process_init(
|
||||
&process.metadata,
|
||||
&mut process.child.stdin.as_mut().unwrap(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to run post-process init: {e}");
|
||||
let _ = process.child.kill().await;
|
||||
@@ -165,10 +165,6 @@ impl ProcessManager {
|
||||
self.processes.get(&id).map(|x| x.metadata.clone())
|
||||
}
|
||||
|
||||
pub fn get_rpc(&self, id: Uuid) -> Option<RpcServer> {
|
||||
self.processes.get(&id).map(|x| x.rpc_server.clone())
|
||||
}
|
||||
|
||||
pub fn get_all(&self) -> Vec<ProcessMetadata> {
|
||||
self.processes
|
||||
.iter()
|
||||
@@ -219,7 +215,6 @@ struct Process {
|
||||
metadata: ProcessMetadata,
|
||||
child: Child,
|
||||
_main_class_keep_alive: TempDir,
|
||||
rpc_server: RpcServer,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
pub mod fetch;
|
||||
pub mod io;
|
||||
pub mod jre;
|
||||
pub mod network;
|
||||
pub mod platform;
|
||||
pub mod protocol_version;
|
||||
pub mod rpc;
|
||||
pub mod server_ping;
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
use std::io;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
pub async fn tcp_listen_any_loopback() -> io::Result<TcpListener> {
|
||||
// IPv4 is tried first for the best compatibility and performance with most systems.
|
||||
// IPv6 is also tried in case IPv4 is not available. Resolving "localhost" is avoided
|
||||
// to prevent failures deriving from improper name resolution setup. Any available
|
||||
// ephemeral port is used to prevent conflicts with other services. This is all as per
|
||||
// RFC 8252's recommendations
|
||||
const ANY_LOOPBACK_SOCKET: &[SocketAddr] = &[
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
|
||||
SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0),
|
||||
];
|
||||
|
||||
TcpListener::bind(ANY_LOOPBACK_SOCKET).await
|
||||
}
|
||||
@@ -1,258 +0,0 @@
|
||||
use crate::prelude::tcp_listen_any_loopback;
|
||||
use crate::{ErrorKind, Result};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::task::AbortHandle;
|
||||
use tokio_util::codec::{Decoder, LinesCodec, LinesCodecError};
|
||||
use uuid::Uuid;
|
||||
|
||||
type HandlerFuture = Pin<Box<dyn Send + Future<Output = Result<Value>>>>;
|
||||
type HandlerMethod = Box<dyn Send + Sync + Fn(Vec<Value>) -> HandlerFuture>;
|
||||
type HandlerMap = HashMap<&'static str, HandlerMethod>;
|
||||
type WaitingResponsesMap =
|
||||
Arc<Mutex<HashMap<Uuid, oneshot::Sender<Result<Value>>>>>;
|
||||
|
||||
pub struct RpcServerBuilder {
|
||||
handlers: HandlerMap,
|
||||
}
|
||||
|
||||
impl RpcServerBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
handlers: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// We'll use this function in the future. Please remove this #[allow] when we do.
|
||||
#[allow(dead_code)]
|
||||
pub fn handler(
|
||||
mut self,
|
||||
function_name: &'static str,
|
||||
handler: HandlerMethod,
|
||||
) -> Self {
|
||||
self.handlers.insert(function_name, Box::new(handler));
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn launch(self) -> Result<RpcServer> {
|
||||
let socket = tcp_listen_any_loopback().await?;
|
||||
let address = socket.local_addr()?;
|
||||
let (message_sender, message_receiver) = mpsc::unbounded_channel();
|
||||
let waiting_responses = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let join_handle = {
|
||||
let waiting_responses = waiting_responses.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut server = RunningRpcServer {
|
||||
message_receiver,
|
||||
handlers: self.handlers,
|
||||
waiting_responses: waiting_responses.clone(),
|
||||
};
|
||||
if let Err(e) = server.run(socket).await {
|
||||
tracing::error!("Failed to run RPC server: {e}");
|
||||
}
|
||||
waiting_responses.lock().unwrap().clear();
|
||||
})
|
||||
};
|
||||
Ok(RpcServer {
|
||||
address,
|
||||
message_sender,
|
||||
waiting_responses,
|
||||
abort_handle: join_handle.abort_handle(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RpcServer {
|
||||
address: SocketAddr,
|
||||
message_sender: mpsc::UnboundedSender<RpcMessage>,
|
||||
waiting_responses: WaitingResponsesMap,
|
||||
abort_handle: AbortHandle,
|
||||
}
|
||||
|
||||
impl RpcServer {
|
||||
pub fn address(&self) -> SocketAddr {
|
||||
self.address
|
||||
}
|
||||
|
||||
pub async fn call_method<R: DeserializeOwned>(
|
||||
&self,
|
||||
method: &str,
|
||||
) -> Result<R> {
|
||||
self.call_method_any(method, vec![]).await
|
||||
}
|
||||
|
||||
pub async fn call_method_2<R: DeserializeOwned>(
|
||||
&self,
|
||||
method: &str,
|
||||
arg1: impl Serialize,
|
||||
arg2: impl Serialize,
|
||||
) -> Result<R> {
|
||||
self.call_method_any(
|
||||
method,
|
||||
vec![serde_json::to_value(arg1)?, serde_json::to_value(arg2)?],
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn call_method_any<R: DeserializeOwned>(
|
||||
&self,
|
||||
method: &str,
|
||||
args: Vec<Value>,
|
||||
) -> Result<R> {
|
||||
if self.message_sender.is_closed() {
|
||||
return Err(ErrorKind::RpcError(
|
||||
"RPC connection closed".to_string(),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let id = Uuid::new_v4();
|
||||
let (send, recv) = oneshot::channel();
|
||||
self.waiting_responses.lock().unwrap().insert(id, send);
|
||||
|
||||
let message = RpcMessage {
|
||||
id,
|
||||
body: RpcMessageBody::Call {
|
||||
method: method.to_owned(),
|
||||
args,
|
||||
},
|
||||
};
|
||||
if self.message_sender.send(message).is_err() {
|
||||
self.waiting_responses.lock().unwrap().remove(&id);
|
||||
return Err(ErrorKind::RpcError(
|
||||
"RPC connection closed while sending".to_string(),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
tracing::debug!("Waiting on result for {id}");
|
||||
let Ok(result) = recv.await else {
|
||||
self.waiting_responses.lock().unwrap().remove(&id);
|
||||
return Err(ErrorKind::RpcError(
|
||||
"RPC connection closed while waiting for response".to_string(),
|
||||
)
|
||||
.into());
|
||||
};
|
||||
result.and_then(|x| Ok(serde_json::from_value(x)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RpcServer {
|
||||
fn drop(&mut self) {
|
||||
self.abort_handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
struct RunningRpcServer {
|
||||
message_receiver: mpsc::UnboundedReceiver<RpcMessage>,
|
||||
handlers: HandlerMap,
|
||||
waiting_responses: WaitingResponsesMap,
|
||||
}
|
||||
|
||||
impl RunningRpcServer {
|
||||
async fn run(&mut self, listener: TcpListener) -> Result<()> {
|
||||
let (socket, _) = listener.accept().await?;
|
||||
drop(listener);
|
||||
|
||||
let mut socket = LinesCodec::new().framed(socket);
|
||||
loop {
|
||||
let to_send = tokio::select! {
|
||||
message = self.message_receiver.recv() => {
|
||||
if message.is_none() {
|
||||
break;
|
||||
}
|
||||
message
|
||||
},
|
||||
message = socket.next() => {
|
||||
let message: RpcMessage = match message {
|
||||
None => break,
|
||||
Some(Ok(message)) => serde_json::from_str(&message)?,
|
||||
Some(Err(LinesCodecError::Io(e))) => Err(e)?,
|
||||
Some(Err(LinesCodecError::MaxLineLengthExceeded)) => unreachable!(),
|
||||
};
|
||||
self.handle_message(message).await?
|
||||
},
|
||||
};
|
||||
if let Some(message) = to_send {
|
||||
let json = serde_json::to_string(&message)?;
|
||||
match socket.send(json).await {
|
||||
Ok(()) => {}
|
||||
Err(LinesCodecError::Io(e)) => Err(e)?,
|
||||
Err(LinesCodecError::MaxLineLengthExceeded) => {
|
||||
unreachable!()
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_message(
|
||||
&self,
|
||||
message: RpcMessage,
|
||||
) -> Result<Option<RpcMessage>> {
|
||||
if let RpcMessageBody::Call { method, args } = message.body {
|
||||
let response = match self.handlers.get(method.as_str()) {
|
||||
Some(handler) => match handler(args).await {
|
||||
Ok(result) => RpcMessageBody::Respond { response: result },
|
||||
Err(e) => RpcMessageBody::Error {
|
||||
error: e.to_string(),
|
||||
},
|
||||
},
|
||||
None => RpcMessageBody::Error {
|
||||
error: format!("Unknown theseus RPC method {method}"),
|
||||
},
|
||||
};
|
||||
Ok(Some(RpcMessage {
|
||||
id: message.id,
|
||||
body: response,
|
||||
}))
|
||||
} else if let Some(sender) =
|
||||
self.waiting_responses.lock().unwrap().remove(&message.id)
|
||||
{
|
||||
let _ = sender.send(match message.body {
|
||||
RpcMessageBody::Respond { response } => Ok(response),
|
||||
RpcMessageBody::Error { error } => {
|
||||
Err(ErrorKind::RpcError(error).into())
|
||||
}
|
||||
_ => unreachable!(),
|
||||
});
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct RpcMessage {
|
||||
id: Uuid,
|
||||
#[serde(flatten)]
|
||||
body: RpcMessageBody,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum RpcMessageBody {
|
||||
Call {
|
||||
method: String,
|
||||
args: Vec<Value>,
|
||||
},
|
||||
Respond {
|
||||
#[serde(default, skip_serializing_if = "Value::is_null")]
|
||||
response: Value,
|
||||
},
|
||||
Error {
|
||||
error: String,
|
||||
},
|
||||
}
|
||||
Reference in New Issue
Block a user