Skip to main content

hydro_deploy/
terraform.rs

1use std::collections::HashMap;
2use std::ffi::OsStr;
3use std::io::{BufRead, BufReader};
4#[cfg(unix)]
5use std::os::unix::process::CommandExt;
6use std::path::PathBuf;
7use std::process::{Child, ChildStdout, Command};
8use std::sync::{Arc, OnceLock, RwLock};
9
10use anyhow::{Context, Result, bail};
11use async_process::Stdio;
12use serde::{Deserialize, Serialize};
13use tempfile::TempDir;
14
15use super::progress::ProgressTracker;
16
17pub static TERRAFORM_ALPHABET: [char; 16] = [
18    '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f',
19];
20
21static TERRAFORM_PATH: OnceLock<PathBuf> = OnceLock::new();
22
23/// Returns the path to the terraform-compatible CLI (tofu or terraform).
24/// Prefers `tofu` if available, otherwise falls back to `terraform`.
25/// The result is cached in a `OnceLock` for subsequent calls.
26fn get_terraform_path() -> &'static PathBuf {
27    TERRAFORM_PATH.get_or_init(|| {
28        which::which("tofu")
29            .or_else(|_| which::which("terraform"))
30            .expect("Neither `tofu` nor `terraform` found in PATH. Please install one of them.")
31    })
32}
33
34fn terraform_command() -> Command {
35    Command::new(get_terraform_path())
36}
37
38/// Returns the name of the terraform-compatible CLI being used (for display purposes).
39pub fn terraform_name() -> &'static str {
40    get_terraform_path()
41        .file_name()
42        .and_then(OsStr::to_str)
43        .unwrap_or("terraform")
44}
45
46/// Keeps track of resources which may need to be cleaned up.
47#[derive(Default)]
48pub struct TerraformPool {
49    counter: u32,
50    active_applies: HashMap<u32, Arc<tokio::sync::RwLock<TerraformApply>>>,
51}
52
53impl TerraformPool {
54    fn create_apply(
55        &mut self,
56        deployment_folder: TempDir,
57    ) -> Result<(u32, Arc<tokio::sync::RwLock<TerraformApply>>)> {
58        let next_counter = self.counter;
59        self.counter += 1;
60
61        let mut apply_command = terraform_command();
62
63        apply_command
64            .current_dir(deployment_folder.path())
65            .arg("apply")
66            .arg("-auto-approve")
67            .arg("-no-color")
68            .arg("-parallelism=128");
69
70        #[cfg(unix)]
71        {
72            apply_command.process_group(0);
73        }
74
75        let spawned_child = apply_command
76            .stdout(Stdio::piped())
77            .stderr(Stdio::piped())
78            .spawn()
79            .with_context(|| format!("Failed to spawn `{}`. Is it installed?", terraform_name()))?;
80
81        let spawned_id = spawned_child.id();
82
83        let deployment = Arc::new(tokio::sync::RwLock::new(TerraformApply {
84            child: Some((spawned_id, Arc::new(RwLock::new(spawned_child)))),
85            deployment_folder: Some(deployment_folder),
86        }));
87
88        self.active_applies.insert(next_counter, deployment.clone());
89
90        Ok((next_counter, deployment))
91    }
92
93    fn drop_apply(&mut self, counter: u32) {
94        self.active_applies.remove(&counter);
95    }
96}
97
98impl Drop for TerraformPool {
99    fn drop(&mut self) {
100        #[expect(
101            clippy::disallowed_methods,
102            reason = "nondeterministic iteration order, fine for assertions"
103        )]
104        for (_, apply) in self.active_applies.drain() {
105            debug_assert_eq!(Arc::strong_count(&apply), 1);
106        }
107    }
108}
109
110#[derive(Serialize, Deserialize)]
111pub struct TerraformBatch {
112    pub terraform: TerraformConfig,
113    #[serde(skip_serializing_if = "HashMap::is_empty")]
114    pub provider: HashMap<String, serde_json::Value>,
115    #[serde(skip_serializing_if = "HashMap::is_empty")]
116    pub data: HashMap<String, HashMap<String, serde_json::Value>>,
117    pub resource: HashMap<String, HashMap<String, serde_json::Value>>,
118    pub output: HashMap<String, TerraformOutput>,
119}
120
121impl Default for TerraformBatch {
122    fn default() -> TerraformBatch {
123        TerraformBatch {
124            terraform: TerraformConfig {
125                required_providers: HashMap::new(),
126            },
127            provider: HashMap::new(),
128            data: HashMap::new(),
129            resource: HashMap::new(),
130            output: HashMap::new(),
131        }
132    }
133}
134
135impl TerraformBatch {
136    pub async fn provision(self, pool: &mut TerraformPool) -> Result<TerraformResult> {
137        // Hack to quiet false-positive `clippy::needless_pass_by_ref_mut` on latest nightlies.
138        // TODO(mingwei): Remove this when it is no longer needed (current date 2023-08-30).
139        // https://github.com/rust-lang/rust-clippy/issues/11380
140        let pool = std::convert::identity(pool);
141
142        if self.terraform.required_providers.is_empty()
143            && self.resource.is_empty()
144            && self.data.is_empty()
145            && self.output.is_empty()
146        {
147            return Ok(TerraformResult {
148                outputs: HashMap::new(),
149                deployment_folder: None,
150            });
151        }
152
153        ProgressTracker::with_group(terraform_name(), Some(1), || async {
154            let dothydro_folder = std::env::current_dir().unwrap().join(".hydro");
155            std::fs::create_dir_all(&dothydro_folder).unwrap();
156            let deployment_folder = tempfile::tempdir_in(dothydro_folder).unwrap();
157
158            std::fs::write(
159                deployment_folder.path().join("main.tf.json"),
160                serde_json::to_string(&self).unwrap(),
161            )
162            .unwrap();
163
164            if !terraform_command()
165                .current_dir(deployment_folder.path())
166                .arg("init")
167                .stdout(Stdio::null())
168                .spawn()
169                .with_context(|| {
170                    format!("Failed to spawn `{}`. Is it installed?", terraform_name())
171                })?
172                .wait()
173                .with_context(|| format!("Failed to launch {} init command", terraform_name()))?
174                .success()
175            {
176                bail!("Failed to initialize {}", terraform_name());
177            }
178
179            let (apply_id, apply) = pool.create_apply(deployment_folder)?;
180
181            let output = ProgressTracker::with_group(
182                "apply",
183                #[expect(
184                    clippy::disallowed_methods,
185                    reason = "nondeterministic iteration order, fine for addition"
186                )]
187                Some(self.resource.values().map(|r| r.len()).sum()),
188                || async { apply.write().await.output().await },
189            )
190            .await;
191            pool.drop_apply(apply_id);
192            output
193        })
194        .await
195    }
196}
197
198struct TerraformApply {
199    child: Option<(u32, Arc<RwLock<Child>>)>,
200    deployment_folder: Option<TempDir>,
201}
202
203async fn display_apply_outputs(stdout: &mut ChildStdout) {
204    let lines = BufReader::new(stdout).lines();
205    let mut waiting_for_result = HashMap::new();
206
207    for line in lines {
208        if let Ok(line) = line {
209            let mut split = line.split(':');
210            if let Some(first) = split.next()
211                && first.chars().all(|c| c != ' ')
212                && split.next().is_some()
213                && split.next().is_none()
214            {
215                if line.starts_with("Plan:")
216                    || line.starts_with("Outputs:")
217                    || line.contains(": Still creating...")
218                    || line.contains(": Reading...")
219                    || line.contains(": Still reading...")
220                    || line.contains(": Read complete after")
221                {
222                } else if line.ends_with(": Creating...") {
223                    let id = line.split(':').next().unwrap().trim();
224                    let (channel_send, channel_recv) = tokio::sync::oneshot::channel();
225                    waiting_for_result.insert(
226                        id.to_owned(),
227                        (
228                            channel_send,
229                            tokio::task::spawn(ProgressTracker::leaf(id.to_owned(), async move {
230                                // `Err(RecvError)` means send side was dropped due to another error.
231                                // Ignore here to prevent spurious panic stack traces.
232                                let _result = channel_recv.await;
233                            })),
234                        ),
235                    );
236                } else if line.contains(": Creation complete after") {
237                    let id = line.split(':').next().unwrap().trim();
238                    let (sender, to_await) = waiting_for_result.remove(id).unwrap();
239                    let _ = sender.send(());
240                    to_await.await.unwrap();
241                } else {
242                    panic!("Unexpected from {}: {}", terraform_name(), line);
243                }
244            }
245        } else {
246            break;
247        }
248    }
249}
250
251fn filter_terraform_logs(child: &mut Child) {
252    let lines = BufReader::new(child.stdout.take().unwrap()).lines();
253    let tf_name = terraform_name();
254    for line in lines {
255        if let Ok(line) = line {
256            let mut split = line.split(':');
257            if let Some(first) = split.next()
258                && first.chars().all(|c| c != ' ')
259                && split.next().is_some()
260                && split.next().is_none()
261            {
262                eprintln!("[{}] {}", tf_name, line);
263            }
264        } else {
265            break;
266        }
267    }
268}
269
270impl TerraformApply {
271    async fn output(&mut self) -> Result<TerraformResult> {
272        let (_, child) = self.child.as_ref().unwrap().clone();
273        let mut stdout = child.write().unwrap().stdout.take().unwrap();
274        let stderr = child.write().unwrap().stderr.take().unwrap();
275
276        let status = tokio::task::spawn_blocking(move || {
277            // it is okay for this thread to keep running even if the future is cancelled
278            child.write().unwrap().wait().unwrap()
279        });
280
281        let display_apply = display_apply_outputs(&mut stdout);
282        let tf_name = terraform_name();
283        let stderr_loop = tokio::task::spawn_blocking(move || {
284            let mut lines = BufReader::new(stderr).lines();
285            while let Some(Ok(line)) = lines.next() {
286                ProgressTracker::println(format!("[{}] {}", tf_name, line));
287            }
288        });
289
290        let _ = futures::join!(display_apply, stderr_loop);
291
292        let status = status.await;
293
294        self.child = None;
295
296        if !status.unwrap().success() {
297            bail!(
298                "{tf} deployment failed, see `[{tf}]` logs above.",
299                tf = terraform_name(),
300            );
301        }
302
303        let mut output_command = terraform_command();
304        output_command
305            .current_dir(self.deployment_folder.as_ref().unwrap().path())
306            .arg("output")
307            .arg("-json");
308
309        #[cfg(unix)]
310        {
311            output_command.process_group(0);
312        }
313
314        let output = output_command
315            .output()
316            .with_context(|| format!("Failed to read {} outputs", terraform_name()))?;
317
318        Ok(TerraformResult {
319            outputs: serde_json::from_slice(&output.stdout).unwrap(),
320            deployment_folder: self.deployment_folder.take(),
321        })
322    }
323}
324
325fn destroy_deployment(deployment_folder: TempDir) {
326    println!(
327        "Destroying {} deployment at {}",
328        terraform_name(),
329        deployment_folder.path().display()
330    );
331
332    let mut destroy_command = terraform_command();
333    destroy_command
334        .current_dir(deployment_folder.path())
335        .arg("destroy")
336        .arg("-auto-approve")
337        .arg("-no-color")
338        .arg("-parallelism=128")
339        .stdout(Stdio::piped());
340
341    #[cfg(unix)]
342    {
343        destroy_command.process_group(0);
344    }
345
346    let mut destroy_child = destroy_command
347        .spawn()
348        .unwrap_or_else(|_| panic!("Failed to spawn {} destroy command", terraform_name()));
349
350    filter_terraform_logs(&mut destroy_child);
351
352    if !destroy_child
353        .wait()
354        .unwrap_or_else(|_| panic!("Failed to destroy {} deployment", terraform_name()))
355        .success()
356    {
357        // prevent the folder from being deleted
358        let _ = deployment_folder.keep();
359        eprintln!("WARNING: failed to destroy {} deployment", terraform_name());
360    }
361}
362
363impl Drop for TerraformApply {
364    fn drop(&mut self) {
365        if let Some((pid, child)) = self.child.take() {
366            #[cfg(unix)]
367            nix::sys::signal::kill(
368                nix::unistd::Pid::from_raw(pid as i32),
369                nix::sys::signal::Signal::SIGINT,
370            )
371            .unwrap();
372            #[cfg(not(unix))]
373            let _ = pid;
374
375            let mut child_write = child.write().unwrap();
376            if child_write.try_wait().unwrap().is_none() {
377                println!("Waiting for Terraform apply to finish...");
378                child_write.wait().unwrap();
379            }
380        }
381
382        if let Some(deployment_folder) = self.deployment_folder.take() {
383            destroy_deployment(deployment_folder);
384        }
385    }
386}
387
388#[derive(Serialize, Deserialize)]
389pub struct TerraformConfig {
390    pub required_providers: HashMap<String, TerraformProvider>,
391}
392
393#[derive(Serialize, Deserialize)]
394pub struct TerraformProvider {
395    pub source: String,
396    pub version: String,
397}
398
399#[derive(Serialize, Deserialize, Debug)]
400pub struct TerraformOutput {
401    pub value: String,
402}
403
404#[derive(Debug)]
405pub struct TerraformResult {
406    pub outputs: HashMap<String, TerraformOutput>,
407    /// `None` if no deployment was performed
408    pub deployment_folder: Option<TempDir>,
409}
410
411impl Drop for TerraformResult {
412    fn drop(&mut self) {
413        if let Some(deployment_folder) = self.deployment_folder.take() {
414            destroy_deployment(deployment_folder);
415        }
416    }
417}
418
419#[derive(Serialize, Deserialize)]
420pub struct TerraformResultOutput {
421    value: String,
422}