1use std::collections::HashMap;
2use std::ffi::OsStr;
3use std::io::{BufRead, BufReader};
4#[cfg(unix)]
5use std::os::unix::process::CommandExt;
6use std::path::PathBuf;
7use std::process::{Child, ChildStdout, Command};
8use std::sync::{Arc, OnceLock, RwLock};
9
10use anyhow::{Context, Result, bail};
11use async_process::Stdio;
12use serde::{Deserialize, Serialize};
13use tempfile::TempDir;
14
15use super::progress::ProgressTracker;
16
17pub static TERRAFORM_ALPHABET: [char; 16] = [
18 '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f',
19];
20
21static TERRAFORM_PATH: OnceLock<PathBuf> = OnceLock::new();
22
23fn get_terraform_path() -> &'static PathBuf {
27 TERRAFORM_PATH.get_or_init(|| {
28 which::which("tofu")
29 .or_else(|_| which::which("terraform"))
30 .expect("Neither `tofu` nor `terraform` found in PATH. Please install one of them.")
31 })
32}
33
34fn terraform_command() -> Command {
35 Command::new(get_terraform_path())
36}
37
38pub fn terraform_name() -> &'static str {
40 get_terraform_path()
41 .file_name()
42 .and_then(OsStr::to_str)
43 .unwrap_or("terraform")
44}
45
46#[derive(Default)]
48pub struct TerraformPool {
49 counter: u32,
50 active_applies: HashMap<u32, Arc<tokio::sync::RwLock<TerraformApply>>>,
51}
52
53impl TerraformPool {
54 fn create_apply(
55 &mut self,
56 deployment_folder: TempDir,
57 ) -> Result<(u32, Arc<tokio::sync::RwLock<TerraformApply>>)> {
58 let next_counter = self.counter;
59 self.counter += 1;
60
61 let mut apply_command = terraform_command();
62
63 apply_command
64 .current_dir(deployment_folder.path())
65 .arg("apply")
66 .arg("-auto-approve")
67 .arg("-no-color")
68 .arg("-parallelism=128");
69
70 #[cfg(unix)]
71 {
72 apply_command.process_group(0);
73 }
74
75 let spawned_child = apply_command
76 .stdout(Stdio::piped())
77 .stderr(Stdio::piped())
78 .spawn()
79 .with_context(|| format!("Failed to spawn `{}`. Is it installed?", terraform_name()))?;
80
81 let spawned_id = spawned_child.id();
82
83 let deployment = Arc::new(tokio::sync::RwLock::new(TerraformApply {
84 child: Some((spawned_id, Arc::new(RwLock::new(spawned_child)))),
85 deployment_folder: Some(deployment_folder),
86 }));
87
88 self.active_applies.insert(next_counter, deployment.clone());
89
90 Ok((next_counter, deployment))
91 }
92
93 fn drop_apply(&mut self, counter: u32) {
94 self.active_applies.remove(&counter);
95 }
96}
97
98impl Drop for TerraformPool {
99 fn drop(&mut self) {
100 #[expect(
101 clippy::disallowed_methods,
102 reason = "nondeterministic iteration order, fine for assertions"
103 )]
104 for (_, apply) in self.active_applies.drain() {
105 debug_assert_eq!(Arc::strong_count(&apply), 1);
106 }
107 }
108}
109
110#[derive(Serialize, Deserialize)]
111pub struct TerraformBatch {
112 pub terraform: TerraformConfig,
113 #[serde(skip_serializing_if = "HashMap::is_empty")]
114 pub provider: HashMap<String, serde_json::Value>,
115 #[serde(skip_serializing_if = "HashMap::is_empty")]
116 pub data: HashMap<String, HashMap<String, serde_json::Value>>,
117 pub resource: HashMap<String, HashMap<String, serde_json::Value>>,
118 pub output: HashMap<String, TerraformOutput>,
119}
120
121impl Default for TerraformBatch {
122 fn default() -> TerraformBatch {
123 TerraformBatch {
124 terraform: TerraformConfig {
125 required_providers: HashMap::new(),
126 },
127 provider: HashMap::new(),
128 data: HashMap::new(),
129 resource: HashMap::new(),
130 output: HashMap::new(),
131 }
132 }
133}
134
135impl TerraformBatch {
136 pub async fn provision(self, pool: &mut TerraformPool) -> Result<TerraformResult> {
137 let pool = std::convert::identity(pool);
141
142 if self.terraform.required_providers.is_empty()
143 && self.resource.is_empty()
144 && self.data.is_empty()
145 && self.output.is_empty()
146 {
147 return Ok(TerraformResult {
148 outputs: HashMap::new(),
149 deployment_folder: None,
150 });
151 }
152
153 ProgressTracker::with_group(terraform_name(), Some(1), || async {
154 let dothydro_folder = std::env::current_dir().unwrap().join(".hydro");
155 std::fs::create_dir_all(&dothydro_folder).unwrap();
156 let deployment_folder = tempfile::tempdir_in(dothydro_folder).unwrap();
157
158 std::fs::write(
159 deployment_folder.path().join("main.tf.json"),
160 serde_json::to_string(&self).unwrap(),
161 )
162 .unwrap();
163
164 if !terraform_command()
165 .current_dir(deployment_folder.path())
166 .arg("init")
167 .stdout(Stdio::null())
168 .spawn()
169 .with_context(|| {
170 format!("Failed to spawn `{}`. Is it installed?", terraform_name())
171 })?
172 .wait()
173 .with_context(|| format!("Failed to launch {} init command", terraform_name()))?
174 .success()
175 {
176 bail!("Failed to initialize {}", terraform_name());
177 }
178
179 let (apply_id, apply) = pool.create_apply(deployment_folder)?;
180
181 let output = ProgressTracker::with_group(
182 "apply",
183 #[expect(
184 clippy::disallowed_methods,
185 reason = "nondeterministic iteration order, fine for addition"
186 )]
187 Some(self.resource.values().map(|r| r.len()).sum()),
188 || async { apply.write().await.output().await },
189 )
190 .await;
191 pool.drop_apply(apply_id);
192 output
193 })
194 .await
195 }
196}
197
198struct TerraformApply {
199 child: Option<(u32, Arc<RwLock<Child>>)>,
200 deployment_folder: Option<TempDir>,
201}
202
203async fn display_apply_outputs(stdout: &mut ChildStdout) {
204 let lines = BufReader::new(stdout).lines();
205 let mut waiting_for_result = HashMap::new();
206
207 for line in lines {
208 if let Ok(line) = line {
209 let mut split = line.split(':');
210 if let Some(first) = split.next()
211 && first.chars().all(|c| c != ' ')
212 && split.next().is_some()
213 && split.next().is_none()
214 {
215 if line.starts_with("Plan:")
216 || line.starts_with("Outputs:")
217 || line.contains(": Still creating...")
218 || line.contains(": Reading...")
219 || line.contains(": Still reading...")
220 || line.contains(": Read complete after")
221 {
222 } else if line.ends_with(": Creating...") {
223 let id = line.split(':').next().unwrap().trim();
224 let (channel_send, channel_recv) = tokio::sync::oneshot::channel();
225 waiting_for_result.insert(
226 id.to_owned(),
227 (
228 channel_send,
229 tokio::task::spawn(ProgressTracker::leaf(id.to_owned(), async move {
230 let _result = channel_recv.await;
233 })),
234 ),
235 );
236 } else if line.contains(": Creation complete after") {
237 let id = line.split(':').next().unwrap().trim();
238 let (sender, to_await) = waiting_for_result.remove(id).unwrap();
239 let _ = sender.send(());
240 to_await.await.unwrap();
241 } else {
242 panic!("Unexpected from {}: {}", terraform_name(), line);
243 }
244 }
245 } else {
246 break;
247 }
248 }
249}
250
251fn filter_terraform_logs(child: &mut Child) {
252 let lines = BufReader::new(child.stdout.take().unwrap()).lines();
253 let tf_name = terraform_name();
254 for line in lines {
255 if let Ok(line) = line {
256 let mut split = line.split(':');
257 if let Some(first) = split.next()
258 && first.chars().all(|c| c != ' ')
259 && split.next().is_some()
260 && split.next().is_none()
261 {
262 eprintln!("[{}] {}", tf_name, line);
263 }
264 } else {
265 break;
266 }
267 }
268}
269
270impl TerraformApply {
271 async fn output(&mut self) -> Result<TerraformResult> {
272 let (_, child) = self.child.as_ref().unwrap().clone();
273 let mut stdout = child.write().unwrap().stdout.take().unwrap();
274 let stderr = child.write().unwrap().stderr.take().unwrap();
275
276 let status = tokio::task::spawn_blocking(move || {
277 child.write().unwrap().wait().unwrap()
279 });
280
281 let display_apply = display_apply_outputs(&mut stdout);
282 let tf_name = terraform_name();
283 let stderr_loop = tokio::task::spawn_blocking(move || {
284 let mut lines = BufReader::new(stderr).lines();
285 while let Some(Ok(line)) = lines.next() {
286 ProgressTracker::println(format!("[{}] {}", tf_name, line));
287 }
288 });
289
290 let _ = futures::join!(display_apply, stderr_loop);
291
292 let status = status.await;
293
294 self.child = None;
295
296 if !status.unwrap().success() {
297 bail!(
298 "{tf} deployment failed, see `[{tf}]` logs above.",
299 tf = terraform_name(),
300 );
301 }
302
303 let mut output_command = terraform_command();
304 output_command
305 .current_dir(self.deployment_folder.as_ref().unwrap().path())
306 .arg("output")
307 .arg("-json");
308
309 #[cfg(unix)]
310 {
311 output_command.process_group(0);
312 }
313
314 let output = output_command
315 .output()
316 .with_context(|| format!("Failed to read {} outputs", terraform_name()))?;
317
318 Ok(TerraformResult {
319 outputs: serde_json::from_slice(&output.stdout).unwrap(),
320 deployment_folder: self.deployment_folder.take(),
321 })
322 }
323}
324
325fn destroy_deployment(deployment_folder: TempDir) {
326 println!(
327 "Destroying {} deployment at {}",
328 terraform_name(),
329 deployment_folder.path().display()
330 );
331
332 let mut destroy_command = terraform_command();
333 destroy_command
334 .current_dir(deployment_folder.path())
335 .arg("destroy")
336 .arg("-auto-approve")
337 .arg("-no-color")
338 .arg("-parallelism=128")
339 .stdout(Stdio::piped());
340
341 #[cfg(unix)]
342 {
343 destroy_command.process_group(0);
344 }
345
346 let mut destroy_child = destroy_command
347 .spawn()
348 .unwrap_or_else(|_| panic!("Failed to spawn {} destroy command", terraform_name()));
349
350 filter_terraform_logs(&mut destroy_child);
351
352 if !destroy_child
353 .wait()
354 .unwrap_or_else(|_| panic!("Failed to destroy {} deployment", terraform_name()))
355 .success()
356 {
357 let _ = deployment_folder.keep();
359 eprintln!("WARNING: failed to destroy {} deployment", terraform_name());
360 }
361}
362
363impl Drop for TerraformApply {
364 fn drop(&mut self) {
365 if let Some((pid, child)) = self.child.take() {
366 #[cfg(unix)]
367 nix::sys::signal::kill(
368 nix::unistd::Pid::from_raw(pid as i32),
369 nix::sys::signal::Signal::SIGINT,
370 )
371 .unwrap();
372 #[cfg(not(unix))]
373 let _ = pid;
374
375 let mut child_write = child.write().unwrap();
376 if child_write.try_wait().unwrap().is_none() {
377 println!("Waiting for Terraform apply to finish...");
378 child_write.wait().unwrap();
379 }
380 }
381
382 if let Some(deployment_folder) = self.deployment_folder.take() {
383 destroy_deployment(deployment_folder);
384 }
385 }
386}
387
388#[derive(Serialize, Deserialize)]
389pub struct TerraformConfig {
390 pub required_providers: HashMap<String, TerraformProvider>,
391}
392
393#[derive(Serialize, Deserialize)]
394pub struct TerraformProvider {
395 pub source: String,
396 pub version: String,
397}
398
399#[derive(Serialize, Deserialize, Debug)]
400pub struct TerraformOutput {
401 pub value: String,
402}
403
404#[derive(Debug)]
405pub struct TerraformResult {
406 pub outputs: HashMap<String, TerraformOutput>,
407 pub deployment_folder: Option<TempDir>,
409}
410
411impl Drop for TerraformResult {
412 fn drop(&mut self) {
413 if let Some(deployment_folder) = self.deployment_folder.take() {
414 destroy_deployment(deployment_folder);
415 }
416 }
417}
418
419#[derive(Serialize, Deserialize)]
420pub struct TerraformResultOutput {
421 value: String,
422}