0

I have a regular ETL job that runs on an AWS ec2 instance.

The workflow looks like the following:

  1. Bring up the ec2 instance using EC2StartInstanceOperator operator.
  2. Find out public IP using boto3 function wrapped inside a PythonOperator. This operator pushes the IP to XCOM.
  3. Establish an SSH hook using the public IP and run a remote command using SSHOperator.
  4. Stop the ec2 instance upon completion using EC2StopInstanceOperator.

The issues with the above are:

  1. The SSH hook (airflow.providers.ssh.hooks.ssh.SSHHook in Airflow 2.0) can not access XCOM, only operators do.
  2. AWS ec2 instances do not get reassigned the same public IP between the runs, so I have to run the PythonOperator to find out the public IP during every run.

Thanks!

abhinavkulkarni
  • 2,069
  • 3
  • 31
  • 47

1 Answers1

0

You can use the SSHOperator's remote_host parameter to set the host to which to connect. This setting "will replace the remote_host which was defined in ssh_hook or predefined in the connection of ssh_conn_id."

The remote_host parameter is templated, so you can read the IP from XCOM using a template macro.

SergiyKolesnikov
  • 6,399
  • 2
  • 23
  • 42
  • How do I do that? I'll have to use `XCom.get_one` method to get value, however, that method requires an `execution_date` parameter which is part of DAGRun, not DAG. – abhinavkulkarni Apr 10 '21 at 01:32
  • @abhinavkulkarni You can use {{ ti.xcom_pull(...) }}, as shown in the link that I provide. Without having a minimal reproducible example of your code it is rather hard to discuss details. – SergiyKolesnikov Apr 10 '21 at 07:20
  • I ended up subclassing `SSHOperator` class wherein I have access to `context` in the `execute` function, in which I can pull XCom. Thanks! – abhinavkulkarni Apr 10 '21 at 09:45