Skip to main content

hydro_deploy/
aws.rs

1use std::any::Any;
2use std::fmt::Debug;
3use std::sync::{Arc, Mutex, OnceLock};
4
5use anyhow::Result;
6use nanoid::nanoid;
7use serde_json::json;
8
9use super::terraform::{TERRAFORM_ALPHABET, TerraformOutput, TerraformProvider};
10use super::{ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult};
11use crate::ssh::LaunchedSshHost;
12use crate::{BaseServerStrategy, HostStrategyGetter, PortNetworkHint};
13
14pub struct LaunchedEc2Instance {
15    resource_result: Arc<ResourceResult>,
16    user: String,
17    pub internal_ip: String,
18    pub external_ip: Option<String>,
19}
20
21impl LaunchedSshHost for LaunchedEc2Instance {
22    fn get_external_ip(&self) -> Option<&str> {
23        self.external_ip.as_deref()
24    }
25
26    fn get_internal_ip(&self) -> &str {
27        &self.internal_ip
28    }
29
30    fn get_cloud_provider(&self) -> &'static str {
31        "AWS"
32    }
33
34    fn resource_result(&self) -> &Arc<ResourceResult> {
35        &self.resource_result
36    }
37
38    fn ssh_user(&self) -> &str {
39        self.user.as_str()
40    }
41}
42
43#[derive(Debug)]
44pub struct AwsNetwork {
45    pub region: String,
46    pub existing_vpc: OnceLock<String>,
47    id: String,
48}
49
50impl AwsNetwork {
51    pub fn new(region: impl Into<String>, existing_vpc: Option<String>) -> Arc<Self> {
52        Arc::new(Self {
53            region: region.into(),
54            existing_vpc: existing_vpc.map(From::from).unwrap_or_default(),
55            id: nanoid!(8, &TERRAFORM_ALPHABET),
56        })
57    }
58
59    fn collect_resources(&self, resource_batch: &mut ResourceBatch) -> String {
60        resource_batch
61            .terraform
62            .terraform
63            .required_providers
64            .insert(
65                "aws".to_owned(),
66                TerraformProvider {
67                    source: "hashicorp/aws".to_owned(),
68                    version: "5.0.0".to_owned(),
69                },
70            );
71
72        resource_batch.terraform.provider.insert(
73            "aws".to_owned(),
74            json!({
75                "region": self.region
76            }),
77        );
78
79        let vpc_network = format!("hydro-vpc-network-{}", self.id);
80
81        if let Some(existing) = self.existing_vpc.get() {
82            if resource_batch
83                .terraform
84                .resource
85                .get("aws_vpc")
86                .is_some_and(|map| map.contains_key(existing))
87            {
88                format!("aws_vpc.{existing}")
89            } else {
90                resource_batch
91                    .terraform
92                    .data
93                    .entry("aws_vpc".to_owned())
94                    .or_default()
95                    .insert(
96                        vpc_network.clone(),
97                        json!({
98                            "id": existing,
99                        }),
100                    );
101
102                format!("data.aws_vpc.{vpc_network}")
103            }
104        } else {
105            resource_batch
106                .terraform
107                .resource
108                .entry("aws_vpc".to_owned())
109                .or_default()
110                .insert(
111                    vpc_network.clone(),
112                    json!({
113                        "cidr_block": "10.0.0.0/16",
114                        "enable_dns_hostnames": true,
115                        "enable_dns_support": true,
116                        "tags": {
117                            "Name": vpc_network
118                        }
119                    }),
120                );
121
122            // Create internet gateway
123            let igw_key = format!("{vpc_network}-igw");
124            resource_batch
125                .terraform
126                .resource
127                .entry("aws_internet_gateway".to_owned())
128                .or_default()
129                .insert(
130                    igw_key.clone(),
131                    json!({
132                        "vpc_id": format!("${{aws_vpc.{}.id}}", vpc_network),
133                        "tags": {
134                            "Name": igw_key
135                        }
136                    }),
137                );
138
139            // Create subnet
140            let subnet_key = format!("{vpc_network}-subnet");
141            resource_batch
142                .terraform
143                .resource
144                .entry("aws_subnet".to_owned())
145                .or_default()
146                .insert(
147                    subnet_key.clone(),
148                    json!({
149                        "vpc_id": format!("${{aws_vpc.{}.id}}", vpc_network),
150                        "cidr_block": "10.0.1.0/24",
151                        "availability_zone": format!("{}a", self.region),
152                        "map_public_ip_on_launch": true,
153                        "tags": {
154                            "Name": subnet_key
155                        }
156                    }),
157                );
158
159            // Create route table
160            let rt_key = format!("{vpc_network}-rt");
161            resource_batch
162                .terraform
163                .resource
164                .entry("aws_route_table".to_owned())
165                .or_default()
166                .insert(
167                    rt_key.clone(),
168                    json!({
169                        "vpc_id": format!("${{aws_vpc.{}.id}}", vpc_network),
170                        "tags": {
171                            "Name": rt_key
172                        }
173                    }),
174                );
175
176            // Create route
177            resource_batch
178                .terraform
179                .resource
180                .entry("aws_route".to_owned())
181                .or_default()
182                .insert(
183                    format!("{vpc_network}-route"),
184                    json!({
185                        "route_table_id": format!("${{aws_route_table.{}.id}}", rt_key),
186                        "destination_cidr_block": "0.0.0.0/0",
187                        "gateway_id": format!("${{aws_internet_gateway.{}.id}}", igw_key)
188                    }),
189                );
190
191            resource_batch
192                .terraform
193                .resource
194                .entry("aws_route_table_association".to_owned())
195                .or_default()
196                .insert(
197                    format!("{vpc_network}-rta"),
198                    json!({
199                        "subnet_id": format!("${{aws_subnet.{}.id}}", subnet_key),
200                        "route_table_id": format!("${{aws_route_table.{}.id}}", rt_key)
201                    }),
202                );
203
204            // Create security group that allows internal communication
205            let sg_key = format!("{vpc_network}-default-sg");
206            resource_batch
207                .terraform
208                .resource
209                .entry("aws_security_group".to_owned())
210                .or_default()
211                .insert(
212                    sg_key,
213                    json!({
214                        "name": format!("{vpc_network}-default-allow-internal"),
215                        "description": "Allow internal communication between instances",
216                        "vpc_id": format!("${{aws_vpc.{}.id}}", vpc_network),
217                        "ingress": [
218                            {
219                                "from_port": 0,
220                                "to_port": 65535,
221                                "protocol": "tcp",
222                                "cidr_blocks": ["10.0.0.0/16"],
223                                "description": "Allow all TCP traffic within VPC",
224                                "ipv6_cidr_blocks": [],
225                                "prefix_list_ids": [],
226                                "security_groups": [],
227                                "self": false
228                            },
229                            {
230                                "from_port": 0,
231                                "to_port": 65535,
232                                "protocol": "udp",
233                                "cidr_blocks": ["10.0.0.0/16"],
234                                "description": "Allow all UDP traffic within VPC",
235                                "ipv6_cidr_blocks": [],
236                                "prefix_list_ids": [],
237                                "security_groups": [],
238                                "self": false
239                            },
240                            {
241                                "from_port": -1,
242                                "to_port": -1,
243                                "protocol": "icmp",
244                                "cidr_blocks": ["10.0.0.0/16"],
245                                "description": "Allow ICMP within VPC",
246                                "ipv6_cidr_blocks": [],
247                                "prefix_list_ids": [],
248                                "security_groups": [],
249                                "self": false
250                            }
251                        ],
252                        "egress": [
253                            {
254                                "from_port": 0,
255                                "to_port": 0,
256                                "protocol": "-1",
257                                "cidr_blocks": ["0.0.0.0/0"],
258                                "description": "Allow all outbound traffic",
259                                "ipv6_cidr_blocks": [],
260                                "prefix_list_ids": [],
261                                "security_groups": [],
262                                "self": false
263                            }
264                        ]
265                    }),
266                );
267
268            let out = format!("aws_vpc.{vpc_network}");
269            self.existing_vpc.set(vpc_network).unwrap();
270            out
271        }
272    }
273}
274
275/// Represents a IAM role, IAM policy attachments, and instance profile for one or multiple EC2 instances.
276#[derive(Debug)]
277pub struct AwsEc2IamInstanceProfile {
278    pub region: String,
279    pub existing_instance_profile_key_or_name: Option<String>,
280    pub policy_arns: Vec<String>,
281    id: String,
282}
283
284impl AwsEc2IamInstanceProfile {
285    /// Creates a new instance. If `existing_instance_profile_name` is `Some`, that will be used as the instance
286    /// profile name which must already exist in the AWS account.
287    pub fn new(region: impl Into<String>, existing_instance_profile_name: Option<String>) -> Self {
288        Self {
289            region: region.into(),
290            existing_instance_profile_key_or_name: existing_instance_profile_name,
291            policy_arns: Default::default(),
292            id: nanoid!(8, &TERRAFORM_ALPHABET),
293        }
294    }
295
296    /// Permits the given ARN.
297    pub fn add_policy_arn(mut self, policy_arn: impl Into<String>) -> Self {
298        if self.existing_instance_profile_key_or_name.is_some() {
299            panic!("Adding an ARN to an existing instance profile is not supported.");
300        }
301        self.policy_arns.push(policy_arn.into());
302        self
303    }
304
305    /// Enables running and emitting telemetry via the CloudWatch agent.
306    pub fn add_cloudwatch_agent_server_policy_arn(self) -> Self {
307        self.add_policy_arn("arn:aws:iam::aws:policy/CloudWatchAgentServerPolicy")
308    }
309
310    fn collect_resources(&mut self, resource_batch: &mut ResourceBatch) -> String {
311        const RESOURCE_AWS_IAM_INSTANCE_PROFILE: &str = "aws_iam_instance_profile";
312        const RESOURCE_AWS_IAM_ROLE_POLICY_ATTACHMENT: &str = "aws_iam_role_policy_attachment";
313        const RESOURCE_AWS_IAM_ROLE: &str = "aws_iam_role";
314
315        resource_batch
316            .terraform
317            .terraform
318            .required_providers
319            .insert(
320                "aws".to_owned(),
321                TerraformProvider {
322                    source: "hashicorp/aws".to_owned(),
323                    version: "5.0.0".to_owned(),
324                },
325            );
326
327        resource_batch.terraform.provider.insert(
328            "aws".to_owned(),
329            json!({
330                "region": self.region
331            }),
332        );
333
334        let instance_profile_key = format!("hydro-instance-profile-{}", self.id);
335
336        if let Some(existing) = self.existing_instance_profile_key_or_name.as_ref() {
337            if resource_batch
338                .terraform
339                .resource
340                .get(RESOURCE_AWS_IAM_INSTANCE_PROFILE)
341                .is_some_and(|map| map.contains_key(existing))
342            {
343                // `existing` is a key.
344                format!("{RESOURCE_AWS_IAM_INSTANCE_PROFILE}.{existing}")
345            } else {
346                // `existing` is a name of an existing resource, supplied when constructed.
347                resource_batch
348                    .terraform
349                    .data
350                    .entry(RESOURCE_AWS_IAM_INSTANCE_PROFILE.to_owned())
351                    .or_default()
352                    .insert(
353                        instance_profile_key.clone(),
354                        json!({
355                            "id": existing,
356                        }),
357                    );
358
359                format!("data.{RESOURCE_AWS_IAM_INSTANCE_PROFILE}.{instance_profile_key}")
360            }
361        } else {
362            // Create the role (permissions set after).
363            let iam_role_key = format!("{instance_profile_key}-iam-role");
364            resource_batch
365                .terraform
366                .resource
367                .entry(RESOURCE_AWS_IAM_ROLE.to_owned())
368                .or_default()
369                .insert(
370                    iam_role_key.clone(),
371                    json!({
372                        "name": format!("hydro-iam-role-{}", self.id),
373                        "assume_role_policy": json!({
374                            "Version": "2012-10-17",
375                            "Statement": [
376                                {
377                                    "Action": "sts:AssumeRole",
378                                    "Effect": "Allow",
379                                    "Principal": {
380                                        "Service": "ec2.amazonaws.com"
381                                    }
382                                }
383                            ]
384                        }).to_string(),
385                    }),
386                );
387
388            // Attach permissions
389            for (i, policy_arn) in self.policy_arns.iter().enumerate() {
390                let policy_attachment_key = format!("{iam_role_key}-policy-attachment-{i}");
391                resource_batch
392                    .terraform
393                    .resource
394                    .entry(RESOURCE_AWS_IAM_ROLE_POLICY_ATTACHMENT.to_owned())
395                    .or_default()
396                    .insert(
397                        policy_attachment_key,
398                        json!({
399                            "policy_arn": policy_arn,
400                            "role": format!("${{{RESOURCE_AWS_IAM_ROLE}.{iam_role_key}.name}}"),
401                        }),
402                    );
403            }
404
405            // Create instance profile. This is what attaches to EC2 instances.
406            resource_batch
407                .terraform
408                .resource
409                .entry(RESOURCE_AWS_IAM_INSTANCE_PROFILE.to_owned())
410                .or_default()
411                .insert(
412                    instance_profile_key.clone(),
413                    json!({
414                        "name": format!("hydro-instance-profile-{}", self.id),
415                        "role": format!("${{{RESOURCE_AWS_IAM_ROLE}.{iam_role_key}.name}}"),
416                    }),
417                );
418
419            // Set key
420            self.existing_instance_profile_key_or_name = Some(instance_profile_key.clone());
421
422            format!("{RESOURCE_AWS_IAM_INSTANCE_PROFILE}.{instance_profile_key}")
423        }
424    }
425}
426
427/// Represents a CloudWatch log group.
428#[derive(Debug)]
429pub struct AwsCloudwatchLogGroup {
430    pub region: String,
431    pub existing_cloudwatch_log_group_key_or_name: Option<String>,
432    id: String,
433}
434
435impl AwsCloudwatchLogGroup {
436    /// Creates a new instance. If `existing_cloudwatch_log_group_name` is `Some`, that will be used as the CloudWatch
437    /// log group name which must already exist in the AWS account and region.
438    pub fn new(
439        region: impl Into<String>,
440        existing_cloudwatch_log_group_name: Option<String>,
441    ) -> Self {
442        Self {
443            region: region.into(),
444            existing_cloudwatch_log_group_key_or_name: existing_cloudwatch_log_group_name,
445            id: nanoid!(8, &TERRAFORM_ALPHABET),
446        }
447    }
448
449    fn collect_resources(&mut self, resource_batch: &mut ResourceBatch) -> String {
450        const RESOURCE_AWS_CLOUDWATCH_LOG_GROUP: &str = "aws_cloudwatch_log_group";
451
452        resource_batch
453            .terraform
454            .terraform
455            .required_providers
456            .insert(
457                "aws".to_owned(),
458                TerraformProvider {
459                    source: "hashicorp/aws".to_owned(),
460                    version: "5.0.0".to_owned(),
461                },
462            );
463
464        resource_batch.terraform.provider.insert(
465            "aws".to_owned(),
466            json!({
467                "region": self.region
468            }),
469        );
470
471        let cloudwatch_log_group_key = format!("hydro-cloudwatch-log-group-{}", self.id);
472
473        if let Some(existing) = self.existing_cloudwatch_log_group_key_or_name.as_ref() {
474            if resource_batch
475                .terraform
476                .resource
477                .get(RESOURCE_AWS_CLOUDWATCH_LOG_GROUP)
478                .is_some_and(|map| map.contains_key(existing))
479            {
480                // `existing` is a key.
481                format!("{RESOURCE_AWS_CLOUDWATCH_LOG_GROUP}.{existing}")
482            } else {
483                // `existing` is a name of an existing resource, supplied when constructed.
484                resource_batch
485                    .terraform
486                    .data
487                    .entry(RESOURCE_AWS_CLOUDWATCH_LOG_GROUP.to_owned())
488                    .or_default()
489                    .insert(
490                        cloudwatch_log_group_key.clone(),
491                        json!({
492                            "id": existing,
493                        }),
494                    );
495
496                format!("data.{RESOURCE_AWS_CLOUDWATCH_LOG_GROUP}.{cloudwatch_log_group_key}")
497            }
498        } else {
499            // Create the log group.
500            resource_batch
501                .terraform
502                .resource
503                .entry(RESOURCE_AWS_CLOUDWATCH_LOG_GROUP.to_owned())
504                .or_default()
505                .insert(
506                    cloudwatch_log_group_key.clone(),
507                    json!({
508                        "name": format!("hydro-cloudwatch-log-group-{}", self.id),
509                        "retention_in_days": 1,
510                    }),
511                );
512
513            // Set key
514            self.existing_cloudwatch_log_group_key_or_name = Some(cloudwatch_log_group_key.clone());
515
516            format!("{RESOURCE_AWS_CLOUDWATCH_LOG_GROUP}.{cloudwatch_log_group_key}")
517        }
518    }
519}
520
521pub struct AwsEc2Host {
522    /// ID from [`crate::Deployment::add_host`].
523    id: usize,
524
525    region: String,
526    instance_type: String,
527    target_type: HostTargetType,
528    ami: String,
529    network: Arc<AwsNetwork>,
530    iam_instance_profile: Option<Arc<Mutex<AwsEc2IamInstanceProfile>>>,
531    cloudwatch_log_group: Option<Arc<Mutex<AwsCloudwatchLogGroup>>>,
532    cwa_metrics_collected: Option<serde_json::Value>,
533    user: Option<String>,
534    display_name: Option<String>,
535    pub launched: OnceLock<Arc<LaunchedEc2Instance>>,
536    external_ports: Mutex<Vec<u16>>,
537}
538
539impl Debug for AwsEc2Host {
540    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
541        f.write_fmt(format_args!(
542            "AwsEc2Host({} ({:?}))",
543            self.id, &self.display_name
544        ))
545    }
546}
547
548impl AwsEc2Host {
549    #[expect(clippy::too_many_arguments, reason = "used via builder pattern")]
550    pub fn new(
551        id: usize,
552        region: impl Into<String>,
553        instance_type: impl Into<String>,
554        target_type: HostTargetType,
555        ami: impl Into<String>,
556        network: Arc<AwsNetwork>,
557        iam_instance_profile: Option<Arc<Mutex<AwsEc2IamInstanceProfile>>>,
558        cloudwatch_log_group: Option<Arc<Mutex<AwsCloudwatchLogGroup>>>,
559        cwa_metrics_collected: Option<serde_json::Value>,
560        user: Option<String>,
561        display_name: Option<String>,
562    ) -> Self {
563        Self {
564            id,
565            region: region.into(),
566            instance_type: instance_type.into(),
567            target_type,
568            ami: ami.into(),
569            network,
570            iam_instance_profile,
571            cloudwatch_log_group,
572            cwa_metrics_collected,
573            user,
574            display_name,
575            launched: OnceLock::new(),
576            external_ports: Mutex::new(Vec::new()),
577        }
578    }
579}
580
581impl Host for AwsEc2Host {
582    fn target_type(&self) -> HostTargetType {
583        self.target_type
584    }
585
586    fn request_port_base(&self, bind_type: &BaseServerStrategy) {
587        match bind_type {
588            BaseServerStrategy::UnixSocket => {}
589            BaseServerStrategy::InternalTcpPort(_) => {}
590            BaseServerStrategy::ExternalTcpPort(port) => {
591                let mut external_ports = self.external_ports.lock().unwrap();
592                if !external_ports.contains(port) {
593                    if self.launched.get().is_some() {
594                        todo!("Cannot adjust security group after host has been launched");
595                    }
596                    external_ports.push(*port);
597                }
598            }
599        }
600    }
601
602    fn request_custom_binary(&self) {
603        self.request_port_base(&BaseServerStrategy::ExternalTcpPort(22));
604    }
605
606    fn id(&self) -> usize {
607        self.id
608    }
609
610    fn collect_resources(&self, resource_batch: &mut ResourceBatch) {
611        if self.launched.get().is_some() {
612            return;
613        }
614
615        let vpc_path = self.network.collect_resources(resource_batch);
616
617        let iam_instance_profile = self
618            .iam_instance_profile
619            .as_deref()
620            .map(|irip| irip.lock().unwrap().collect_resources(resource_batch));
621
622        let cloudwatch_log_group = self
623            .cloudwatch_log_group
624            .as_deref()
625            .map(|cwlg| cwlg.lock().unwrap().collect_resources(resource_batch));
626
627        // Add additional providers
628        resource_batch
629            .terraform
630            .terraform
631            .required_providers
632            .insert(
633                "local".to_owned(),
634                TerraformProvider {
635                    source: "hashicorp/local".to_owned(),
636                    version: "2.3.0".to_owned(),
637                },
638            );
639
640        resource_batch
641            .terraform
642            .terraform
643            .required_providers
644            .insert(
645                "tls".to_owned(),
646                TerraformProvider {
647                    source: "hashicorp/tls".to_owned(),
648                    version: "4.0.4".to_owned(),
649                },
650            );
651
652        // Generate SSH key pair
653        resource_batch
654            .terraform
655            .resource
656            .entry("tls_private_key".to_owned())
657            .or_default()
658            .insert(
659                "vm_instance_ssh_key".to_owned(),
660                json!({
661                    "algorithm": "RSA",
662                    "rsa_bits": 4096
663                }),
664            );
665
666        resource_batch
667            .terraform
668            .resource
669            .entry("local_file".to_owned())
670            .or_default()
671            .insert(
672                "vm_instance_ssh_key_pem".to_owned(),
673                json!({
674                    "content": "${tls_private_key.vm_instance_ssh_key.private_key_pem}",
675                    "filename": ".ssh/vm_instance_ssh_key_pem",
676                    "file_permission": "0600",
677                    "directory_permission": "0700"
678                }),
679            );
680
681        resource_batch
682            .terraform
683            .resource
684            .entry("aws_key_pair".to_owned())
685            .or_default()
686            .insert(
687                "ec2_key_pair".to_owned(),
688                json!({
689                    "key_name": format!("hydro-key-{}", nanoid!(8, &TERRAFORM_ALPHABET)),
690                    "public_key": "${tls_private_key.vm_instance_ssh_key.public_key_openssh}"
691                }),
692            );
693
694        let instance_key = format!("ec2-instance-{}", self.id);
695        let mut instance_name = format!("hydro-ec2-instance-{}", nanoid!(8, &TERRAFORM_ALPHABET));
696
697        if let Some(mut display_name) = self.display_name.clone() {
698            instance_name.push('-');
699            display_name = display_name.replace("_", "-").to_lowercase();
700
701            let num_chars_to_cut = instance_name.len() + display_name.len() - 63;
702            if num_chars_to_cut > 0 {
703                display_name.drain(0..num_chars_to_cut);
704            }
705            instance_name.push_str(&display_name);
706        }
707
708        let network_id = self.network.id.clone();
709        let vpc_ref = format!("${{{}.id}}", vpc_path);
710        let default_sg_ref = format!(
711            "${{aws_security_group.hydro-vpc-network-{}-default-sg.id}}",
712            network_id
713        );
714
715        // Create additional security group for external ports if needed
716        let mut security_groups = vec![default_sg_ref];
717        let external_ports = self.external_ports.lock().unwrap();
718
719        if !external_ports.is_empty() {
720            let sg_key = format!("sg-{}", self.id);
721            let mut sg_rules = vec![];
722
723            for port in external_ports.iter() {
724                sg_rules.push(json!({
725                    "from_port": port,
726                    "to_port": port,
727                    "protocol": "tcp",
728                    "cidr_blocks": ["0.0.0.0/0"],
729                    "description": format!("External port {}", port),
730                    "ipv6_cidr_blocks": [],
731                    "prefix_list_ids": [],
732                    "security_groups": [],
733                    "self": false
734                }));
735            }
736
737            resource_batch
738                .terraform
739                .resource
740                .entry("aws_security_group".to_owned())
741                .or_default()
742                .insert(
743                    sg_key.clone(),
744                    json!({
745                        "name": format!("hydro-sg-{}", nanoid!(8, &TERRAFORM_ALPHABET)),
746                        "description": "Hydro external ports security group",
747                        "vpc_id": vpc_ref,
748                        "ingress": sg_rules,
749                        "egress": [{
750                            "from_port": 0,
751                            "to_port": 0,
752                            "protocol": "-1",
753                            "cidr_blocks": ["0.0.0.0/0"],
754                            "description": "All outbound traffic",
755                            "ipv6_cidr_blocks": [],
756                            "prefix_list_ids": [],
757                            "security_groups": [],
758                            "self": false
759                        }]
760                    }),
761                );
762
763            security_groups.push(format!("${{aws_security_group.{}.id}}", sg_key));
764        }
765        drop(external_ports);
766
767        let subnet_ref = format!("${{aws_subnet.hydro-vpc-network-{}-subnet.id}}", network_id);
768        let iam_instance_profile_ref = iam_instance_profile.map(|key| format!("${{{key}.name}}"));
769
770        // Write the CloudWatch Agent config file.
771        // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html
772        let cloudwatch_agent_config = cloudwatch_log_group.map(|cwlg| {
773            json!({
774                "logs": {
775                    "logs_collected": {
776                        "files": {
777                            "collect_list": [
778                                {
779                                    "file_path": "/var/log/hydro/metrics.log",
780                                    "log_group_name": format!("${{{cwlg}.name}}"), // This `$` is interpreted by terraform
781                                    "log_stream_name": "{{instance_id}}"
782                                }
783                            ]
784                        }
785                    }
786                },
787                "metrics": {
788                    // "namespace": todo!(), // TODO(mingwei): use flow_name here somehow
789                    "metrics_collected": self.cwa_metrics_collected.as_ref().unwrap_or(&json!({
790                        "cpu": {
791                            "resources": [
792                                "*"
793                            ],
794                            "measurement": [
795                                "usage_active"
796                            ],
797                            "totalcpu": true
798                        },
799                        "mem": {
800                            "measurement": [
801                                "used_percent"
802                            ]
803                        }
804                    })),
805                    // See special escape handling below.
806                    "append_dimensions": {
807                        "InstanceId": "${aws:InstanceId}"
808                    }
809                }
810            })
811            .to_string()
812        });
813
814        // TODO(mingwei): Run this in SSH instead of `user_data` to avoid racing and capture errors.
815        let user_data_script = cloudwatch_agent_config.map(|cwa_config| {
816            let cwa_config_esc = cwa_config
817                .replace("\\", r"\\") // escape backslashes
818                .replace("\"", r#"\""#) // escape quotes
819                .replace("\n", r"\n") // escape newlines
820                // Special handling of AWS `append_dimensions` fields:
821                // `$$` to escape for terraform, becomes `\$` in bash, becomes `$` in echo output.
822                .replace("${aws:", r"\$${aws:");
823            format!(
824                r##"
825#!/bin/bash
826set -euxo pipefail
827
828mkdir -p /var/log/hydro/
829chmod +777 /var/log/hydro
830touch /var/log/hydro/metrics.log
831chmod +666 /var/log/hydro/metrics.log
832
833# Install the CloudWatch Agent
834yum install -y amazon-cloudwatch-agent
835
836mkdir -p /opt/aws/amazon-cloudwatch-agent/etc
837echo -e "{cwa_config_esc}" > /opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json
838
839# Start or restart the agent
840/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl \
841    -a fetch-config -m ec2 \
842    -c file:/opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json \
843    -s
844"##
845            )
846        });
847
848        // Create EC2 instance
849        resource_batch
850            .terraform
851            .resource
852            .entry("aws_instance".to_owned())
853            .or_default()
854            .insert(
855                instance_key.clone(),
856                json!({
857                    "ami": self.ami,
858                    "instance_type": self.instance_type,
859                    "key_name": "${aws_key_pair.ec2_key_pair.key_name}",
860                    "vpc_security_group_ids": security_groups,
861                    "subnet_id": subnet_ref,
862                    "associate_public_ip_address": true,
863                    "iam_instance_profile": iam_instance_profile_ref, // May be `None`.
864                    "user_data": user_data_script, // May be `None`.
865                    "tags": {
866                        "Name": instance_name
867                    }
868                }),
869            );
870
871        resource_batch.terraform.output.insert(
872            format!("{}-private-ip", instance_key),
873            TerraformOutput {
874                value: format!("${{aws_instance.{}.private_ip}}", instance_key),
875            },
876        );
877
878        resource_batch.terraform.output.insert(
879            format!("{}-public-ip", instance_key),
880            TerraformOutput {
881                value: format!("${{aws_instance.{}.public_ip}}", instance_key),
882            },
883        );
884    }
885
886    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
887        self.launched
888            .get()
889            .map(|a| a.clone() as Arc<dyn LaunchedHost>)
890    }
891
892    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
893        self.launched
894            .get_or_init(|| {
895                let id = self.id;
896
897                let internal_ip = resource_result
898                    .terraform
899                    .outputs
900                    .get(&format!("ec2-instance-{id}-private-ip"))
901                    .unwrap()
902                    .value
903                    .clone();
904
905                let external_ip = resource_result
906                    .terraform
907                    .outputs
908                    .get(&format!("ec2-instance-{id}-public-ip"))
909                    .map(|v| v.value.clone());
910
911                Arc::new(LaunchedEc2Instance {
912                    resource_result: resource_result.clone(),
913                    user: self.user.clone().unwrap_or_else(|| "ec2-user".to_owned()),
914                    internal_ip,
915                    external_ip,
916                })
917            })
918            .clone()
919    }
920
921    fn strategy_as_server<'a>(
922        &'a self,
923        client_host: &dyn Host,
924        network_hint: PortNetworkHint,
925    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
926        if matches!(network_hint, PortNetworkHint::Auto)
927            && client_host.can_connect_to(ClientStrategy::UnixSocket(self.id))
928        {
929            Ok((
930                ClientStrategy::UnixSocket(self.id),
931                Box::new(|_| BaseServerStrategy::UnixSocket),
932            ))
933        } else if matches!(
934            network_hint,
935            PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
936        ) && client_host.can_connect_to(ClientStrategy::InternalTcpPort(self))
937        {
938            Ok((
939                ClientStrategy::InternalTcpPort(self),
940                Box::new(move |_| {
941                    BaseServerStrategy::InternalTcpPort(match network_hint {
942                        PortNetworkHint::Auto => None,
943                        PortNetworkHint::TcpPort(port) => port,
944                    })
945                }),
946            ))
947        } else if matches!(network_hint, PortNetworkHint::Auto)
948            && client_host.can_connect_to(ClientStrategy::ForwardedTcpPort(self))
949        {
950            Ok((
951                ClientStrategy::ForwardedTcpPort(self),
952                Box::new(|me| {
953                    me.downcast_ref::<AwsEc2Host>()
954                        .unwrap()
955                        .request_port_base(&BaseServerStrategy::ExternalTcpPort(22));
956                    BaseServerStrategy::InternalTcpPort(None)
957                }),
958            ))
959        } else {
960            anyhow::bail!("Could not find a strategy to connect to AWS EC2 instance")
961        }
962    }
963
964    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
965        match typ {
966            ClientStrategy::UnixSocket(id) => {
967                #[cfg(unix)]
968                {
969                    self.id == id
970                }
971
972                #[cfg(not(unix))]
973                {
974                    let _ = id;
975                    false
976                }
977            }
978            ClientStrategy::InternalTcpPort(target_host) => {
979                if let Some(aws_target) = <dyn Any>::downcast_ref::<AwsEc2Host>(target_host) {
980                    self.region == aws_target.region
981                        && Arc::ptr_eq(&self.network, &aws_target.network)
982                } else {
983                    false
984                }
985            }
986            ClientStrategy::ForwardedTcpPort(_) => false,
987        }
988    }
989}