m/cup
1
0
mirror of https://github.com/sergi0g/cup.git synced 2025-11-08 05:03:49 -05:00

Removed all threading and switched everything to async. >2x speedup 🚀

This commit is contained in:
Sergio
2024-09-15 18:47:00 +03:00
parent 38bf187a4a
commit 0c9ad61a4d
8 changed files with 804 additions and 283 deletions

777
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,18 +6,20 @@ edition = "2021"
[dependencies]
clap = { version = "4.5.7", features = ["derive"] }
indicatif = { version = "0.17.8", optional = true }
tokio = {version = "1.38.0", features = ["rt", "rt-multi-thread", "macros"]}
ureq = { version = "2.9.7", features = ["tls"] }
rayon = "1.10.0"
tokio = {version = "1.38.0", features = ["macros"]}
xitca-web = { version = "0.5.0", optional = true, features = ["logger"] }
liquid = { version = "0.26.6", optional = true }
bollard = "0.16.1"
once_cell = "1.19.0"
http-auth = { version = "0.1.9", features = [] }
http-auth = { version = "0.1.9", default-features = false, features = [] }
termsize = { version = "0.1.8", optional = true }
regex = "1.10.5"
chrono = { version = "0.4.38", default-features = false, features = ["std", "alloc", "clock"], optional = true }
json = "0.12.4"
reqwest = "0.12.7"
futures = "0.3.30"
reqwest-retry = "0.6.1"
reqwest-middleware = "0.3.3"
[features]
default = ["server", "cli"]

View File

@@ -1,16 +1,12 @@
use std::{
collections::{HashMap, HashSet},
sync::Mutex,
};
use std::collections::{HashMap, HashSet};
use json::JsonValue;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use crate::{
docker::get_images_from_docker_daemon,
image::Image,
registry::{check_auth, get_latest_digests, get_token},
utils::unsplit_image,
utils::{new_reqwest_client, unsplit_image},
};
#[cfg(feature = "cli")]
@@ -37,49 +33,48 @@ pub async fn get_all_updates(
socket: Option<String>,
config: &JsonValue,
) -> Vec<(String, Option<bool>)> {
let image_map_mutex: Mutex<HashMap<String, &Option<String>>> = Mutex::new(HashMap::new());
let local_images = get_images_from_docker_daemon(socket).await;
local_images.par_iter().for_each(|image| {
let mut image_map: HashMap<String, Option<String>> = HashMap::with_capacity(local_images.len());
for image in &local_images {
let img = unsplit_image(&image.registry, &image.repository, &image.tag);
image_map_mutex.lock().unwrap().insert(img, &image.digest);
});
let image_map = image_map_mutex.lock().unwrap().clone();
image_map.insert(img, image.digest.clone());
};
let mut registries: Vec<&String> = local_images
.par_iter()
.iter()
.map(|image| &image.registry)
.collect();
registries.unique();
let mut remote_images: Vec<Image> = Vec::new();
let mut remote_images: Vec<Image> = Vec::with_capacity(local_images.len());
let client = new_reqwest_client();
for registry in registries {
let images: Vec<&Image> = local_images
.par_iter()
.iter()
.filter(|image| &image.registry == registry)
.collect();
let credentials = config["authentication"][registry]
.clone()
.take_string()
.or(None);
let mut latest_images = match check_auth(registry, config) {
let mut latest_images = match check_auth(registry, config, &client).await {
Some(auth_url) => {
let token = get_token(images.clone(), &auth_url, &credentials);
get_latest_digests(images, Some(&token), config)
let token = get_token(images.clone(), &auth_url, &credentials, &client).await;
get_latest_digests(images, Some(&token), config, &client).await
}
None => get_latest_digests(images, None, config),
None => get_latest_digests(images, None, config, &client).await,
};
remote_images.append(&mut latest_images);
}
let result_mutex: Mutex<Vec<(String, Option<bool>)>> = Mutex::new(Vec::new());
remote_images.par_iter().for_each(|image| {
let mut result: Vec<(String, Option<bool>)> = Vec::new();
remote_images.iter().for_each(|image| {
let img = unsplit_image(&image.registry, &image.repository, &image.tag);
match &image.digest {
Some(d) => {
let r = d != image_map.get(&img).unwrap().as_ref().unwrap();
result_mutex.lock().unwrap().push((img, Some(r)))
result.push((img, Some(r)))
}
None => result_mutex.lock().unwrap().push((img, None)),
None => result.push((img, None)),
}
});
let result = result_mutex.lock().unwrap().clone();
result
}
@@ -90,13 +85,14 @@ pub async fn get_update(image: &str, socket: Option<String>, config: &JsonValue)
.clone()
.take_string()
.or(None);
let token = match check_auth(&local_image.registry, config) {
Some(auth_url) => get_token(vec![&local_image], &auth_url, &credentials),
let client = new_reqwest_client();
let token = match check_auth(&local_image.registry, config, &client).await {
Some(auth_url) => get_token(vec![&local_image], &auth_url, &credentials, &client).await,
None => String::new(),
};
let remote_image = match token.as_str() {
"" => get_latest_digest(&local_image, None, config),
_ => get_latest_digest(&local_image, Some(&token), config),
"" => get_latest_digest(&local_image, None, config, &client).await,
_ => get_latest_digest(&local_image, Some(&token), config, &client).await,
};
match &remote_image.digest {
Some(d) => Some(d != &local_image.digest.unwrap()),

View File

@@ -2,6 +2,7 @@ use bollard::{secret::ImageSummary, ClientVersion, Docker};
#[cfg(feature = "cli")]
use bollard::secret::ImageInspect;
use futures::future::join_all;
use crate::{error, image::Image, utils::split_image};
@@ -32,27 +33,16 @@ pub async fn get_images_from_docker_daemon(socket: Option<String>) -> Vec<Image>
error!("Failed to retrieve list of images available!\n{}", e)
}
};
let mut result: Vec<Image> = Vec::new();
let mut handles = Vec::new();
for image in images {
if !image.repo_tags.is_empty() && !image.repo_digests.is_empty() {
for t in &image.repo_tags {
let (registry, repository, tag) = split_image(t);
result.push(Image {
registry,
repository,
tag,
digest: Some(
image.repo_digests[0]
.clone()
.split('@')
.collect::<Vec<&str>>()[1]
.to_string(),
),
});
}
handles.push(Image::from(image))
};
join_all(handles).await.iter().filter(|img| {
match img {
Some(_) => true,
None => false
}
}
result
}).map(|img| img.clone().unwrap()).collect()
}
#[cfg(feature = "cli")]

View File

@@ -8,7 +8,7 @@ use crate::utils::{sort_update_vec, to_json};
pub fn print_updates(updates: &[(String, Option<bool>)], icons: &bool) {
let sorted_updates = sort_update_vec(updates);
let term_width: usize = termsize::get()
.unwrap_or(termsize::Size { rows: 24, cols: 80 })
.unwrap_or_else(|| termsize::Size { rows: 24, cols: 80 })
.cols as usize;
for update in sorted_updates {
let description = match update.1 {

View File

@@ -1,3 +1,7 @@
use bollard::secret::ImageSummary;
use crate::utils::split_image;
#[derive(Clone, Debug)]
pub struct Image {
pub registry: String,
@@ -5,3 +9,27 @@ pub struct Image {
pub tag: String,
pub digest: Option<String>,
}
impl Image {
pub async fn from(image: ImageSummary) -> Option<Self> {
if !image.repo_tags.is_empty() && !image.repo_digests.is_empty() {
for t in &image.repo_tags {
let (registry, repository, tag) = split_image(t);
let image = Image {
registry,
repository,
tag,
digest: Some(
image.repo_digests[0]
.clone()
.split('@')
.collect::<Vec<&str>>()[1]
.to_string(),
),
};
return Some(image)
}
}
None
}
}

View File

@@ -1,160 +1,136 @@
use std::sync::Mutex;
use futures::future::join_all;
use json::JsonValue;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use ureq::{Error, ErrorKind};
use http_auth::parse_challenges;
use reqwest_middleware::ClientWithMiddleware;
use crate::{error, image::Image, warn};
pub fn check_auth(registry: &str, config: &JsonValue) -> Option<String> {
pub async fn check_auth(registry: &str, config: &JsonValue, client: &ClientWithMiddleware) -> Option<String> {
let protocol = if config["insecure_registries"].contains(registry) {
"http"
} else {
"https"
};
let response = ureq::get(&format!("{}://{}/v2/", protocol, registry)).call();
let response = client.get(&format!("{}://{}/v2/", protocol, registry)).send().await;
match response {
Ok(_) => None,
Err(Error::Status(401, response)) => match response.header("www-authenticate") {
Some(challenge) => Some(parse_www_authenticate(challenge)),
None => error!(
"Unauthorized to access registry {} and no way to authenticate was provided",
registry
),
Ok(r) => {
let status = r.status().as_u16();
if status == 401 {
match r.headers().get("www-authenticate") {
Some(challenge) => Some(parse_www_authenticate(challenge.to_str().unwrap())),
None => error!(
"Unauthorized to access registry {} and no way to authenticate was provided",
registry
),
}
} else if status == 200 {
None
} else {
warn!("Received unexpected status code {}\nResponse: {}", status, r.text().await.unwrap());
None
}
},
Err(Error::Transport(error)) => {
match error.kind() {
ErrorKind::Dns => {
warn!("Failed to lookup the IP of the registry, retrying.");
return check_auth(registry, config);
} // If something goes really wrong, this can get stuck in a loop
ErrorKind::ConnectionFailed => {
warn!("Connection probably timed out, retrying.");
return check_auth(registry, config);
} // Same here
_ => error!("{}", error),
Err(e) => {
if e.is_connect() {
warn!("Connection to registry {} failed.", &registry);
None
} else {
error!("Unexpected error: {}", e.to_string())
}
}
Err(e) => error!("{}", e),
}
}
pub fn get_latest_digest(image: &Image, token: Option<&String>, config: &JsonValue) -> Image {
pub async fn get_latest_digest(image: &Image, token: Option<&String>, config: &JsonValue, client: &ClientWithMiddleware) -> Image {
let protocol =
if config["insecure_registries"].contains(json::JsonValue::from(image.registry.clone())) {
"http"
} else {
"https"
};
let mut request = ureq::head(&format!(
let mut request = client.head(&format!(
"{}://{}/v2/{}/manifests/{}",
protocol, &image.registry, &image.repository, &image.tag
));
if let Some(t) = token {
request = request.set("Authorization", &format!("Bearer {}", t));
request = request.header("Authorization", &format!("Bearer {}", t));
}
let raw_response = match request
.set("Accept", "application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.index.v1+json")
.call()
.header("Accept", "application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.index.v1+json")
.send().await
{
Ok(response) => response,
Err(Error::Status(401, response)) => {
if token.is_some() {
warn!("Failed to authenticate to registry {} with given token!\n{}", &image.registry, token.unwrap());
Ok(response) => {
let status = response.status();
if status == 401 {
if token.is_some() {
warn!("Failed to authenticate to registry {} with given token!\n{}", &image.registry, token.unwrap());
} else {
warn!("Registry requires authentication");
}
return Image { digest: None, ..image.clone() }
} else if status == 404 {
warn!("Image {:?} not found", &image);
return Image { digest: None, ..image.clone() }
} else {
return get_latest_digest(
image,
Some(&get_token(
vec![image],
&parse_www_authenticate(response.header("www-authenticate").unwrap()),
&None // I think?
)),
config
);
}
}
Err(Error::Status(_, _)) => {
return Image {
digest: None,
..image.clone()
response
}
},
Err(Error::Transport(error)) => {
match error.kind() {
ErrorKind::Dns => {
warn!("Failed to lookup the IP of the registry, retrying.");
return get_latest_digest(image, token, config)
}, // If something goes really wrong, this can get stuck in a loop
ErrorKind::ConnectionFailed => {
warn!("Connection probably timed out, retrying.");
return get_latest_digest(image, token, config)
}, // Same here
_ => error!("Failed to retrieve image digest\n{}!", error)
Err(e) => {
if e.is_connect() {
warn!("Connection to registry failed.");
return Image { digest: None, ..image.clone() }
} else {
error!("Unexpected error: {}", e.to_string())
}
},
};
match raw_response.header("docker-content-digest") {
match raw_response.headers().get("docker-content-digest") {
Some(digest) => Image {
digest: Some(digest.to_string()),
digest: Some(digest.to_str().unwrap().to_string()),
..image.clone()
},
None => error!("Server returned invalid response! No docker-content-digest!"),
None => error!("Server returned invalid response! No docker-content-digest!\n{:#?}", raw_response),
}
}
pub fn get_latest_digests(
pub async fn get_latest_digests(
images: Vec<&Image>,
token: Option<&String>,
config: &JsonValue,
client: &ClientWithMiddleware
) -> Vec<Image> {
let result: Mutex<Vec<Image>> = Mutex::new(Vec::new());
images.par_iter().for_each(|&image| {
let digest = get_latest_digest(image, token, config).digest;
result.lock().unwrap().push(Image {
digest,
..image.clone()
});
});
let r = result.lock().unwrap().clone();
r
let mut handles = Vec::new();
for image in images {
handles.push(get_latest_digest(image, token, config, client))
}
join_all(handles).await
}
pub fn get_token(images: Vec<&Image>, auth_url: &str, credentials: &Option<String>) -> String {
pub async fn get_token(images: Vec<&Image>, auth_url: &str, credentials: &Option<String>, client: &ClientWithMiddleware) -> String {
let mut final_url = auth_url.to_owned();
for image in &images {
final_url = format!("{}&scope=repository:{}:pull", final_url, image.repository);
}
let mut base_request =
ureq::get(&final_url).set("Accept", "application/vnd.oci.image.index.v1+json"); // Seems to be unnecesarry. Will probably remove in the future
client.get(&final_url).header("Accept", "application/vnd.oci.image.index.v1+json"); // Seems to be unnecessary. Will probably remove in the future
base_request = match credentials {
Some(creds) => base_request.set("Authorization", &format!("Basic {}", creds)),
Some(creds) => base_request.header("Authorization", &format!("Basic {}", creds)),
None => base_request,
};
let raw_response = match base_request.call() {
Ok(response) => match response.into_string() {
let raw_response = match base_request.send().await {
Ok(response) => match response.text().await {
Ok(res) => res,
Err(e) => {
error!("Failed to parse response into string!\n{}", e)
}
},
Err(Error::Transport(error)) => {
match error.kind() {
ErrorKind::Dns => {
warn!("Failed to lookup the IP of the registry, retrying.");
return get_token(images, auth_url, credentials);
} // If something goes really wrong, this can get stuck in a loop
ErrorKind::ConnectionFailed => {
warn!("Connection probably timed out, retrying.");
return get_token(images, auth_url, credentials);
} // Same here
_ => error!("Token request failed\n{}!", error),
}
}
Err(e) => {
error!("Token request failed!\n{}", e)
if e.is_connect() {
error!("Connection to registry failed.");
} else {
error!("Token request failed!\n{}", e.to_string())
}
}
};
let parsed_token_response: JsonValue = match json::parse(&raw_response) {

View File

@@ -3,6 +3,8 @@ use std::path::PathBuf;
use json::{object, JsonValue};
use once_cell::sync::Lazy;
use regex::Regex;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
/// This macro is an alternative to panic. It prints the message you give it and exits the process with code 1, without printing a stack trace. Useful for when the program has to exit due to a user error or something unexpected which is unrelated to the program (e.g. a failed web request)
#[macro_export]
@@ -21,14 +23,15 @@ macro_rules! warn {
})
}
static RE: Lazy<Regex> = Lazy::new(|| {
Regex::new(
r#"^(?P<name>(?:(?P<registry>(?:(?:localhost|[\w-]+(?:\.[\w-]+)+)(?::\d+)?)|[\w]+:\d+)/)?(?P<repository>[a-z0-9_.-]+(?:/[a-z0-9_.-]+)*))(?::(?P<tag>[\w][\w.-]{0,127}))?$"#, // From https://regex101.com/r/nmSDPA/1
)
.unwrap()
});
/// Takes an image and splits it into registry, repository and tag. For example ghcr.io/sergi0g/cup:latest becomes ['ghcr.io', 'sergi0g/cup', 'latest'].
pub fn split_image(image: &str) -> (String, String, String) {
static RE: Lazy<Regex> = Lazy::new(|| {
Regex::new(
r#"^(?P<name>(?:(?P<registry>(?:(?:localhost|[\w-]+(?:\.[\w-]+)+)(?::\d+)?)|[\w]+:\d+)/)?(?P<repository>[a-z0-9_.-]+(?:/[a-z0-9_.-]+)*))(?::(?P<tag>[\w][\w.-]{0,127}))?$"#, // From https://regex101.com/r/nmSDPA/1
)
.unwrap()
});
match RE.captures(image) {
Some(c) => {
let registry = match c.name("registry") {
@@ -124,21 +127,24 @@ pub fn to_json(updates: &[(String, Option<bool>)]) -> JsonValue {
let up_to_date = updates
.iter()
.filter(|&(_, value)| *value == Some(false))
.collect::<Vec<&(String, Option<bool>)>>()
.len();
.count();
let update_available = updates
.iter()
.filter(|&(_, value)| *value == Some(true))
.collect::<Vec<&(String, Option<bool>)>>()
.len();
.count();
let unknown = updates
.iter()
.filter(|&(_, value)| value.is_none())
.collect::<Vec<&(String, Option<bool>)>>()
.len();
.count();
let _ = json_data["metrics"].insert("monitored_images", updates.len());
let _ = json_data["metrics"].insert("up_to_date", up_to_date);
let _ = json_data["metrics"].insert("update_available", update_available);
let _ = json_data["metrics"].insert("unknown", unknown);
json_data
}
pub fn new_reqwest_client() -> ClientWithMiddleware {
ClientBuilder::new(reqwest::Client::new())
.with(RetryTransientMiddleware::new_with_policy(ExponentialBackoff::builder().build_with_max_retries(3)))
.build()
}