Rustic cleanups, dedups and making the code less hard to read in general (#251)

* typos :help_me:

* (part 1/?) massive cleanup to make the code more Rust-ic and cut down heap allocations.

* (part 2/?) massive cleanup to make the code more Rust-ic and cut down heap allocations.

* (part 3/?) cut down some pretty major heap allocations here - more Bytes and BytesMuts, less Vec<u8>s

also I don't really understand why you need to `to_vec` when you don't really use it again afterwards

* (part 4/?) deduplicate error handling in backblaze logic

* (part 5/?) fixes, cleanups, refactors, and reformatting

* (part 6/?) cleanups and refactors

* remove loads of `as_str` in types that already are `Display`

* Revert "remove loads of `as_str` in types that already are `Display`"

This reverts commit 4f974310cfb167ceba03001d81388db4f0fbb509.

* reformat and move routes util to the util module

* use streams

* Run prepare + formatting issues

Co-authored-by: Jai A <jaiagr+gpg@pm.me>
Co-authored-by: Geometrically <18202329+Geometrically@users.noreply.github.com>
This commit is contained in:
Leo Chen
2021-10-12 11:26:59 +08:00
committed by GitHub
parent 0010119440
commit 13187de97d
53 changed files with 997 additions and 1129 deletions

View File

@@ -1,28 +1,27 @@
use crate::file_hosting::S3Host;
use crate::health::scheduler::HealthCounters;
use crate::util::env::{parse_strings_from_var, parse_var};
use actix_cors::Cors;
use actix_ratelimit::errors::ARError;
use actix_ratelimit::{MemoryStore, MemoryStoreActor, RateLimiter};
use actix_web::{http, web, App, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder;
use env_logger::Env;
use gumdrop::Options;
use log::{error, info, warn};
use rand::Rng;
use search::indexing::index_projects;
use search::indexing::IndexingSettings;
use std::sync::Arc;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use crate::health::pod::PodInfo;
use crate::health::scheduler::HealthCounters;
use actix_web_prom::{PrometheusMetricsBuilder};
use std::sync::Arc;
mod database;
mod file_hosting;
mod health;
mod models;
mod routes;
mod scheduler;
mod search;
mod health;
mod util;
mod validate;
@@ -112,18 +111,16 @@ async fn main() -> std::io::Result<()> {
let storage_backend = dotenv::var("STORAGE_BACKEND").unwrap_or_else(|_| "local".to_string());
let file_host: Arc<dyn file_hosting::FileHost + Send + Sync> = if storage_backend == "backblaze"
{
Arc::new(
let file_host: Arc<dyn file_hosting::FileHost + Send + Sync> = match storage_backend.as_str() {
"backblaze" => Arc::new(
file_hosting::BackblazeHost::new(
&dotenv::var("BACKBLAZE_KEY_ID").unwrap(),
&dotenv::var("BACKBLAZE_KEY").unwrap(),
&dotenv::var("BACKBLAZE_BUCKET_ID").unwrap(),
)
.await,
)
} else if storage_backend == "s3" {
Arc::new(
),
"s3" => Arc::new(
S3Host::new(
&*dotenv::var("S3_BUCKET_NAME").unwrap(),
&*dotenv::var("S3_REGION").unwrap(),
@@ -132,30 +129,24 @@ async fn main() -> std::io::Result<()> {
&*dotenv::var("S3_SECRET").unwrap(),
)
.unwrap(),
)
} else if storage_backend == "local" {
Arc::new(file_hosting::MockHost::new())
} else {
panic!("Invalid storage backend specified. Aborting startup!")
),
"local" => Arc::new(file_hosting::MockHost::new()),
_ => panic!("Invalid storage backend specified. Aborting startup!"),
};
let mut scheduler = scheduler::Scheduler::new();
// The interval in seconds at which the local database is indexed
// for searching. Defaults to 1 hour if unset.
let local_index_interval = std::time::Duration::from_secs(
dotenv::var("LOCAL_INDEX_INTERVAL")
.ok()
.map(|i| i.parse().unwrap())
.unwrap_or(3600),
);
let local_index_interval =
std::time::Duration::from_secs(parse_var("LOCAL_INDEX_INTERVAL").unwrap_or(3600));
let pool_ref = pool.clone();
let thread_search_config = search_config.clone();
let mut skip = skip_initial;
let pool_ref = pool.clone();
let search_config_ref = search_config.clone();
scheduler.run(local_index_interval, move || {
let pool_ref = pool_ref.clone();
let thread_search_config = thread_search_config.clone();
let search_config_ref = search_config_ref.clone();
let local_skip = skip;
if skip {
skip = false;
@@ -166,7 +157,7 @@ async fn main() -> std::io::Result<()> {
}
info!("Indexing local database");
let settings = IndexingSettings { index_local: true };
let result = index_projects(pool_ref, settings, &thread_search_config).await;
let result = index_projects(pool_ref, settings, &search_config_ref).await;
if let Err(e) = result {
warn!("Local project indexing failed: {:?}", e);
}
@@ -219,12 +210,12 @@ async fn main() -> std::io::Result<()> {
let indexing_queue = Arc::new(search::indexing::queue::CreationQueue::new());
let queue_ref = indexing_queue.clone();
let thread_search_config = search_config.clone();
let mut skip = skip_initial;
let queue_ref = indexing_queue.clone();
let search_config_ref = search_config.clone();
scheduler.run(std::time::Duration::from_secs(15 * 60), move || {
let queue = queue_ref.clone();
let thread_search_config = thread_search_config.clone();
let queue_ref = queue_ref.clone();
let search_config_ref = search_config_ref.clone();
let local_skip = skip;
if skip {
skip = false;
@@ -234,7 +225,7 @@ async fn main() -> std::io::Result<()> {
return;
}
info!("Indexing created project queue");
let result = search::indexing::queue::index_queue(&*queue, &thread_search_config).await;
let result = queue_ref.index(&search_config_ref).await;
if let Err(e) = result {
warn!("Indexing created projects failed: {:?}", e);
}
@@ -250,12 +241,6 @@ async fn main() -> std::io::Result<()> {
};
let store = MemoryStore::new();
// Generate pod id
let pod = PodInfo::new();
// Init prometheus cluster
let mut labels = HashMap::new();
labels.insert("pod".to_string(), pod.pod_name);
labels.insert("node".to_string(), pod.node_name);
// Get prometheus service
let mut prometheus = PrometheusMetricsBuilder::new("api")
@@ -275,8 +260,8 @@ async fn main() -> std::io::Result<()> {
.wrap(health.clone())
.wrap(
Cors::default()
.allowed_methods(vec!["GET", "POST", "DELETE", "PATCH", "PUT"])
.allowed_headers(vec![http::header::AUTHORIZATION, http::header::ACCEPT])
.allowed_methods(["GET", "POST", "DELETE", "PATCH", "PUT"])
.allowed_headers([http::header::AUTHORIZATION, http::header::ACCEPT])
.allowed_header(http::header::CONTENT_TYPE)
.allow_any_origin()
.max_age(3600),
@@ -288,12 +273,8 @@ async fn main() -> std::io::Result<()> {
RateLimiter::new(MemoryStoreActor::from(store.clone()).start())
.with_identifier(|req| {
let connection_info = req.connection_info();
let ip = String::from(
if dotenv::var("CLOUDFLARE_INTEGRATION")
.ok()
.map(|i| i.parse().unwrap())
.unwrap_or(false)
{
let ip =
String::from(if parse_var("CLOUDFLARE_INTEGRATION").unwrap_or(false) {
if let Some(header) = req.headers().get("CF-Connecting-IP") {
header.to_str().map_err(|_| ARError::IdentificationError)?
} else {
@@ -305,13 +286,10 @@ async fn main() -> std::io::Result<()> {
connection_info
.remote_addr()
.ok_or(ARError::IdentificationError)?
},
);
});
let ignore_ips = dotenv::var("RATE_LIMIT_IGNORE_IPS")
.ok()
.and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok())
.unwrap_or_else(Vec::new);
let ignore_ips =
parse_strings_from_var("RATE_LIMIT_IGNORE_IPS").unwrap_or_default();
if ignore_ips.contains(&ip) {
// At an even distribution of numbers, this will allow at the most
@@ -348,28 +326,19 @@ async fn main() -> std::io::Result<()> {
fn check_env_vars() -> bool {
let mut failed = false;
fn check_var<T: std::str::FromStr>(var: &str) -> bool {
if dotenv::var(var)
.ok()
.and_then(|s| s.parse::<T>().ok())
.is_none()
{
fn check_var<T: std::str::FromStr>(var: &'static str) -> bool {
let check = parse_var::<T>(var).is_none();
if check {
warn!(
"Variable `{}` missing in dotenv or not of type `{}`",
var,
std::any::type_name::<T>()
);
true
} else {
false
}
check
}
if dotenv::var("RATE_LIMIT_IGNORE_IPS")
.ok()
.and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok())
.is_none()
{
if parse_strings_from_var("RATE_LIMIT_IGNORE_IPS").is_none() {
warn!("Variable `RATE_LIMIT_IGNORE_IPS` missing in dotenv or not a json array of strings");
failed |= true;
}
@@ -384,24 +353,31 @@ fn check_env_vars() -> bool {
failed |= check_var::<String>("STORAGE_BACKEND");
let storage_backend = dotenv::var("STORAGE_BACKEND").ok();
if storage_backend.as_deref() == Some("backblaze") {
failed |= check_var::<String>("BACKBLAZE_KEY_ID");
failed |= check_var::<String>("BACKBLAZE_KEY");
failed |= check_var::<String>("BACKBLAZE_BUCKET_ID");
} else if storage_backend.as_deref() == Some("s3") {
failed |= check_var::<String>("S3_ACCESS_TOKEN");
failed |= check_var::<String>("S3_SECRET");
failed |= check_var::<String>("S3_URL");
failed |= check_var::<String>("S3_REGION");
failed |= check_var::<String>("S3_BUCKET_NAME");
} else if storage_backend.as_deref() == Some("local") {
failed |= check_var::<String>("MOCK_FILE_PATH");
} else if let Some(backend) = storage_backend {
warn!("Variable `STORAGE_BACKEND` contains an invalid value: {}. Expected \"backblaze\", \"s3\", or \"local\".", backend);
failed |= true;
match storage_backend.as_deref() {
Some("backblaze") => {
failed |= check_var::<String>("BACKBLAZE_KEY_ID");
failed |= check_var::<String>("BACKBLAZE_KEY");
failed |= check_var::<String>("BACKBLAZE_BUCKET_ID");
}
Some("s3") => {
failed |= check_var::<String>("S3_ACCESS_TOKEN");
failed |= check_var::<String>("S3_SECRET");
failed |= check_var::<String>("S3_URL");
failed |= check_var::<String>("S3_REGION");
failed |= check_var::<String>("S3_BUCKET_NAME");
}
Some("local") => {
failed |= check_var::<String>("MOCK_FILE_PATH");
}
Some(backend) => {
warn!("Variable `STORAGE_BACKEND` contains an invalid value: {}. Expected \"backblaze\", \"s3\", or \"local\".", backend);
failed |= true;
}
_ => {
warn!("Variable `STORAGE_BACKEND` is not set!");
failed |= true;
}
}
failed |= check_var::<usize>("LOCAL_INDEX_INTERVAL");
failed |= check_var::<usize>("VERSION_INDEX_INTERVAL");