From 6ec2d7100944e1340bb93143223d247626f5447a Mon Sep 17 00:00:00 2001 From: Sergio <77530549+sergi0g@users.noreply.github.com> Date: Wed, 28 May 2025 12:13:31 +0300 Subject: [PATCH] feat: enable upgrading running images --- Cargo.lock | 11 ++- Cargo.toml | 4 +- src/check.rs | 2 + src/docker.rs | 224 +++++++++++++++++++++++++++++++++++++++++++++++--- src/server.rs | 124 +++++++++++++++++++++------- 5 files changed, 317 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b565a8d..22cd0eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,9 +164,9 @@ dependencies = [ [[package]] name = "bollard" -version = "0.18.1" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97ccca1260af6a459d75994ad5acc1651bcabcbdbc41467cc9786519ab854c30" +checksum = "af706e9dc793491dd382c99c22fde6e9934433d4cc0d6a4b34eb2cdc57a5c917" dependencies = [ "base64", "bollard-stubs", @@ -197,11 +197,12 @@ dependencies = [ [[package]] name = "bollard-stubs" -version = "1.47.1-rc.27.3.1" +version = "1.48.2-rc.28.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f179cfbddb6e77a5472703d4b30436bff32929c0aa8a9008ecf23d1d3cdd0da" +checksum = "79cdf0fccd5341b38ae0be74b74410bdd5eceeea8876dc149a13edfe57e3b259" dependencies = [ "serde", + "serde_json", "serde_repr", "serde_with", ] @@ -2556,6 +2557,8 @@ checksum = "dd4f8f16791ea2a8845f617f1e87887f917835e0603d01f03a51e638b9613d0c" dependencies = [ "futures-core", "pin-project-lite", + "serde", + "serde_json", "tokio", "xitca-http", "xitca-server", diff --git a/Cargo.toml b/Cargo.toml index 2cb750b..6f7656e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ edition = "2021" clap = { version = "4.5.7", features = ["derive"] } indicatif = { version = "0.17.8", optional = true } tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] } -xitca-web = { version = "0.6.2", optional = true } +xitca-web = { version = "0.6.2", optional = true, features = ["json"]} liquid = { version = "0.26.6", optional = true } -bollard = "0.18.1" +bollard = "0.19.0" once_cell = "1.19.0" http-auth = { version = "0.1.9", default-features = false } termsize = { version = "0.1.8", optional = true } diff --git a/src/check.rs b/src/check.rs index dc3e0fb..6413016 100644 --- a/src/check.rs +++ b/src/check.rs @@ -209,7 +209,9 @@ pub async fn get_updates( } // Await all the futures let images = join_all(handles).await; + let mut updates: Vec = images.iter().map(|image| image.to_update()).collect(); + updates.extend_from_slice(&remote_updates); updates } diff --git a/src/docker.rs b/src/docker.rs index dc2b168..d0f5e42 100644 --- a/src/docker.rs +++ b/src/docker.rs @@ -1,9 +1,26 @@ -use bollard::{container::ListContainersOptions, models::ImageInspect, ClientVersion, Docker}; +use bollard::{ + models::ImageInspect, + query_parameters::{ + CreateContainerOptionsBuilder, CreateImageOptionsBuilder, InspectContainerOptions, + ListContainersOptionsBuilder, ListImagesOptions, ListServicesOptions, + RemoveContainerOptions, RenameContainerOptions, StartContainerOptions, + StopContainerOptions, + }, + secret::{ContainerCreateBody, CreateImageInfo}, + ClientVersion, Docker, +}; -use futures::future::join_all; +use futures::{future::join_all, StreamExt}; use rustc_hash::FxHashMap; -use crate::{error, structs::image::Image, Context}; +use crate::{ + error, + structs::{ + image::Image, + update::{Update, UpdateInfo}, + }, + Context, +}; fn create_docker_client(socket: Option<&str>) -> Docker { let client: Result = match socket { @@ -43,7 +60,7 @@ pub async fn get_images_from_docker_daemon( references: &Option>, ) -> Vec { let client: Docker = create_docker_client(ctx.config.socket.as_deref()); - let mut swarm_images = match client.list_services::(None).await { + let mut swarm_images = match client.list_services(None::).await { Ok(services) => services .iter() .filter_map(|service| match &service.spec { @@ -80,7 +97,7 @@ pub async fn get_images_from_docker_daemon( .collect() } None => { - let images = match client.list_images::(None).await { + let images = match client.list_images(None::).await { Ok(images) => images, Err(e) => { error!("Failed to retrieve list of images available!\n{}", e) @@ -99,13 +116,8 @@ pub async fn get_images_from_docker_daemon( pub async fn get_in_use_images(ctx: &Context) -> FxHashMap> { let client: Docker = create_docker_client(ctx.config.socket.as_deref()); - let containers = match client - .list_containers::(Some(ListContainersOptions { - all: true, - ..Default::default() - })) - .await - { + let options = ListContainersOptionsBuilder::new().all(true).build(); + let containers = match client.list_containers(Some(options)).await { Ok(containers) => containers, Err(e) => { error!("Failed to retrieve list of containers available!\n{}", e) @@ -149,3 +161,191 @@ pub async fn get_in_use_images(ctx: &Context) -> FxHashMap> }); result.clone() } + +/// Given a container name and the update information returned about the image it uses, tries to recreate it with a new image / latest version of the current image +pub async fn upgrade_container(ctx: &Context, name: &str, update: &Update) -> Result<(), String> { + let client: Docker = create_docker_client(ctx.config.socket.as_deref()); // TODO: Consider adding all these functions to a long lived struct with a shared client. We don't want to create a new client for every container updated. + + // Create a few variables that will be used later on + let new_name = format!("{name}__cup_temp"); // A new temporary name for the container. Instead of removing the old one straight away, we'll create a new one and if that succeeds we'll rename it. + let new_image = match &update.result.info { + // Find the new reference for the image, based on logic used in the web interface. This will be used to pull the new image + UpdateInfo::Version(update_info) => format!( + "{}:{}", + update + .reference + .split_once(':') + .expect("Reference contains `:`") + .0, + update_info.new_tag + ), + UpdateInfo::Digest(_) => update.reference.clone(), + UpdateInfo::None => unreachable!("Tried to update up-to-date image"), + }; + ctx.logger.debug(format!("Upgrading {name}...")); + + // Retrieve information about current container and construct required structs to create a new container afterwards + let (create_options, create_config) = match client + .inspect_container(name, None::) + .await + { + Ok(inspect) => { + let create_options = { + let mut options = CreateContainerOptionsBuilder::new(); + match inspect.name { + Some(_) => options = options.name(&new_name), + None => (), // Not sure if this is even reachable + }; + match inspect.platform { + Some(platform) => options = options.platform(&platform), + None => (), // Same as above + }; + options.build() + }; + + let inspect_config = inspect.config.unwrap(); // For easier access later + + let create_config = ContainerCreateBody { + hostname: inspect_config.hostname, + domainname: inspect_config.domainname, + user: inspect_config.user, + attach_stdin: inspect_config.attach_stdin, + attach_stderr: inspect_config.attach_stderr, + attach_stdout: inspect_config.attach_stdout, + exposed_ports: inspect_config.exposed_ports, + tty: inspect_config.tty, + open_stdin: inspect_config.open_stdin, + stdin_once: inspect_config.stdin_once, + env: inspect_config.env, + cmd: inspect_config.cmd, + healthcheck: inspect_config.healthcheck, + args_escaped: inspect_config.args_escaped, + image: Some(new_image.clone()), + volumes: inspect_config.volumes, + working_dir: inspect_config.working_dir, + entrypoint: inspect_config.entrypoint, + network_disabled: inspect_config.network_disabled, + mac_address: inspect_config.mac_address, + on_build: inspect_config.on_build, + labels: inspect_config.labels, + stop_signal: inspect_config.stop_signal, + stop_timeout: inspect_config.stop_timeout, + shell: inspect_config.shell, + host_config: inspect.host_config, + // The commented out code below doesn't work because bollard sends gw_priority as a float and Docker expects an int. Tracking issue: https://github.com/fussybeaver/bollard/issues/537 + // networking_config: Some(bollard::secret::NetworkingConfig { + // endpoints_config: inspect.network_settings.unwrap().networks, + // }), + networking_config: None, + }; + (create_options, create_config) + } + Err(e) => { + let message = format!("Failed to inspect container {name}: {e}"); + ctx.logger.warn(&message); + return Err(message) + }, + }; + + // Stop the current container + ctx.logger.debug(format!("Stopping {name}...")); + match client + .stop_container(name, None::) + .await + { + Ok(()) => ctx.logger.debug(format!("Successfully stopped {name}")), + Err(e) => { + let message = format!("Failed to stop container {name}: {e}"); + ctx.logger.warn(&message); + return Err(message) + }, + }; + + // Don't let the naming fool you, we're pulling the new image here. + ctx.logger.debug(format!("Pulling {new_image} for {name}...")); + let create_image_options = CreateImageOptionsBuilder::new() + .from_image(&new_image) + .build(); + + client + .create_image(Some(create_image_options), None, None) // TODO: credentials support + .collect::>>() // Not entirely sure this is the best way to handle a stream + .await; // TODO: handle errors here + ctx.logger.debug(format!("Successfully pulled new image for {name}")); + + // Create the new container + ctx.logger.debug(format!("Creating new container for {name}...")); + match client + .create_container(Some(create_options), create_config) + .await + { + Ok(response) => { + // Let the user know if any warnings occured + response + .warnings + .iter() + .for_each(|warning| ctx.logger.warn(format!("[DAEMON]: {}", warning))); + }, + Err(e) => { + let message = format!("Failed to create new container for {name}: {e}"); + ctx.logger.warn(&message); + return Err(message) + }, + }; + + // Start the new container + match client + .start_container(&new_name, None::) + .await + { + Ok(()) => ctx.logger.debug(format!("Successfully created new container for {name}")), + Err(e) => { + let message = format!("Failed to start new container for {name}: {e}"); + ctx.logger.warn(&message); + return Err(message) + }, + } + + // Remove the old container + ctx.logger.debug(format!("Removing old {name} container")); + match client + .remove_container(name, None::) + .await + { + Ok(()) => ctx.logger.debug(format!("Successfully removed old {name} container")), + Err(e) => { + match e { + bollard::errors::Error::DockerResponseServerError { status_code: 404, message } => { + ctx.logger.warn(format!("Failed to remove container {name}, it was probably started with `--rm` and has been automatically cleaned up. Message from server: {message}")) + }, + _ => { + let message = format!("Failed to remove container {name}: {e}"); + ctx.logger.warn(&message); + return Err(message) + }, + } + } + } + + // Rename the new container + match client + .rename_container( + &new_name, + RenameContainerOptions { + name: name.to_owned(), + }, + ) + .await + { + Ok(()) => (), + Err(e) => { + let message = format!("Failed to rename container {name}: {e}"); + ctx.logger.warn(&message); + return Err(message) + }, + } + + ctx.logger.debug(format!("Successfully upgraded {name}!")); + + Ok(()) +} diff --git a/src/server.rs b/src/server.rs index da50feb..ea77386 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,19 +1,20 @@ -use std::{env, sync::Arc}; +use std::{env, sync::Arc, time::SystemTime}; use chrono::Local; use chrono_tz::Tz; use liquid::{object, Object, ValueView}; use rustc_hash::FxHashMap; +use serde::Deserialize; use serde_json::Value; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use tokio_cron_scheduler::{Job, JobScheduler}; use xitca_web::{ body::ResponseBody, bytes::Bytes, error::Error, - handler::{handler_service, path::PathRef, state::StateRef}, + handler::{handler_service, json::LazyJson, path::PathRef, state::StateRef}, http::{StatusCode, WebResponse}, - route::get, + route::{get, post}, service::Service, App, WebContext, }; @@ -21,6 +22,7 @@ use xitca_web::{ use crate::{ check::get_updates, config::Theme, + docker::upgrade_container, error, structs::update::Update, utils::{ @@ -38,6 +40,10 @@ const FAVICON_ICO: Bytes = Bytes::from_static(include_bytes!("static/favicon.ico const FAVICON_SVG: Bytes = Bytes::from_static(include_bytes!("static/favicon.svg")); const APPLE_TOUCH_ICON: Bytes = Bytes::from_static(include_bytes!("static/apple-touch-icon.png")); +const SUCCESS_STATUS: &str = r#"{"success":true}"#; // Store this to avoid recomputation +const UPGRADE_INTERNAL_SERVER_ERROR: &str = + r#"{"success":"false","message":"Internal server error. Please view logs for details"}"#; + const SORT_ORDER: [&str; 8] = [ "monitored_images", "updates_available", @@ -53,7 +59,7 @@ pub async fn serve(port: &u16, ctx: &Context) -> std::io::Result<()> { ctx.logger.info("Starting server, please wait..."); let data = ServerData::new(ctx).await; let scheduler = JobScheduler::new().await.unwrap(); - let data = Arc::new(Mutex::new(data)); + let data = Arc::new(RwLock::new(data)); let data_copy = data.clone(); let tz = env::var("TZ") .map(|tz| tz.parse().unwrap_or(Tz::UTC)) @@ -67,7 +73,7 @@ pub async fn serve(port: &u16, ctx: &Context) -> std::io::Result<()> { move |_uuid, _lock| { let data_copy = data_copy.clone(); Box::pin(async move { - data_copy.lock().await.refresh().await; + data_copy.write().await.refresh().await; }) }, ) { @@ -92,7 +98,10 @@ pub async fn serve(port: &u16, ctx: &Context) -> std::io::Result<()> { let mut app_builder = App::new() .with_state(data) .at("/api/v3/json", get(handler_service(json))) - .at("/api/v3/refresh", get(handler_service(refresh))); + .at("/api/v3/refresh", get(handler_service(refresh_v3))) + .at("/api/v4/json", get(handler_service(json))) + .at("/api/v4/refresh", get(handler_service(refresh_v4))) + .at("/api/v4/upgrade", post(handler_service(upgrade))); if !ctx.config.agent { app_builder = app_builder .at("/", get(handler_service(_static))) @@ -110,17 +119,17 @@ pub async fn serve(port: &u16, ctx: &Context) -> std::io::Result<()> { .wait() } -async fn _static(data: StateRef<'_, Arc>>, path: PathRef<'_>) -> WebResponse { +async fn _static(data: StateRef<'_, Arc>>, path: PathRef<'_>) -> WebResponse { match path.0 { "/" => WebResponse::builder() .header("Content-Type", "text/html") - .body(ResponseBody::from(data.lock().await.template.clone())) + .body(ResponseBody::from(data.read().await.template.clone())) .unwrap(), "/assets/index.js" => WebResponse::builder() .header("Content-Type", "text/javascript") .body(ResponseBody::from(JS.replace( "=\"neutral\"", - &format!("=\"{}\"", data.lock().await.theme), + &format!("=\"{}\"", data.read().await.theme), ))) .unwrap(), "/assets/index.css" => WebResponse::builder() @@ -146,20 +155,56 @@ async fn _static(data: StateRef<'_, Arc>>, path: PathRef<'_>) } } -async fn json(data: StateRef<'_, Arc>>) -> WebResponse { +async fn json(data: StateRef<'_, Arc>>) -> WebResponse { WebResponse::builder() .header("Content-Type", "application/json") .body(ResponseBody::from( - data.lock().await.json.clone().to_string(), + data.read().await.json.clone().to_string(), )) .unwrap() } -async fn refresh(data: StateRef<'_, Arc>>) -> WebResponse { - data.lock().await.refresh().await; +async fn refresh_v3(data: StateRef<'_, Arc>>) -> WebResponse { + data.write().await.refresh().await; WebResponse::new(ResponseBody::from("OK")) } +async fn refresh_v4(data: StateRef<'_, Arc>>) -> WebResponse { + data.write().await.refresh().await; + WebResponse::new(ResponseBody::from(SUCCESS_STATUS)) +} + +#[derive(Deserialize)] +struct UpgradeRequest { + name: String, // Container name to be upgraded +} + +async fn upgrade( + data: StateRef<'_, Arc>>, + body: LazyJson, +) -> WebResponse { + let data = data.read().await; + let UpgradeRequest { name } = match body.deserialize::() { + Ok(ur) => ur, + Err(e) => { + return WebResponse::builder().status(StatusCode::BAD_REQUEST).body(ResponseBody::from(serde_json::json!({"success": "false", "message": format!("Invalid JSON payload: {e}")}).to_string())).unwrap() + } + }; + match data.raw_updates.iter().find(|update| { + update.used_by.contains(&name) + && update.status.to_option_bool().is_some_and(|status| status) + }) { + Some(update) => match upgrade_container(&data.ctx, &name, update).await { + Ok(()) => WebResponse::new(ResponseBody::from(SUCCESS_STATUS)), + Err(_) => WebResponse::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(ResponseBody::from(UPGRADE_INTERNAL_SERVER_ERROR)) + .unwrap(), + }, + None => WebResponse::builder().status(StatusCode::BAD_REQUEST).body(ResponseBody::from(serde_json::json!({"success": "false", "message": format!("Container `{name}` does not exist or has no updates")}).to_string())).unwrap(), + } +} + struct ServerData { template: String, raw_updates: Vec, @@ -175,7 +220,10 @@ impl ServerData { template: String::new(), json: Value::Null, raw_updates: Vec::new(), - theme: "neutral", + theme: match ctx.config.theme { + Theme::Default => "neutral", + Theme::Blue => "gray", + }, }; s.refresh().await; s @@ -203,10 +251,6 @@ impl ServerData { .to_rfc3339_opts(chrono::SecondsFormat::Secs, true) .to_string() .into(); - self.theme = match &self.ctx.config.theme { - Theme::Default => "neutral", - Theme::Blue => "gray", - }; let mut metrics = self.json["metrics"] .as_object() .unwrap() @@ -257,17 +301,11 @@ where let method = request.method().to_string(); let url = request.uri().to_string(); - if &method != "GET" { - // We only allow GET requests - - log(&method, &url, 405, elapsed(start)); - Err(Error::from(StatusCode::METHOD_NOT_ALLOWED)) - } else { - let res = next.call(ctx).await?; - let status = res.status().as_u16(); - - log(&method, &url, status, elapsed(start)); - Ok(res) + match (method.as_str(), url.as_str()) { + ("POST", "/api/v4/upgrade") => continue_request(ctx, next, &method, &url, start).await, + ("GET", "/api/v4/upgrade") | ("POST", _) => return_405(&method, &url, start).await, + ("GET", _) => continue_request(ctx, next, &method, &url, start).await, + (_, _) => return_405(&method, &url, start).await, } } @@ -284,3 +322,29 @@ fn log(method: &str, url: &str, status: u16, time: u32) { method, url, color, status, time ) } + +async fn continue_request( + ctx: WebContext<'_, C, B>, + next: &S, + method: &str, + url: &str, + start: SystemTime, +) -> Result> +where + S: for<'r> Service, Response = WebResponse, Error = Error>, +{ + let res = next.call(ctx).await?; + let status = res.status().as_u16(); + + log(&method, &url, status, elapsed(start)); + Ok(res) +} + +async fn return_405( + method: &str, + url: &str, + start: SystemTime, +) -> Result> { + log(&method, &url, 405, elapsed(start)); + Err(Error::from(StatusCode::METHOD_NOT_ALLOWED)) +}