본 실습에서는 S3에 데이터 적재하기 에서 실습한 코드를 Airflow로 구현한다.
실습 코드: airflow/dags/upbit_hourly_dag.py
실습 코드: airflow/operators/runner.py
4.2 실습 코드와 거의 유사
파라미터로 입력하는 execution_date 를 Jinja Template으로 불러온다. (참고: Airflow templates reference)
execution_date = '{{((dag_run.logical_date + macros.timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S")}}'
1번에서 제작한 script를 실행하는 DAG 생성
upbit_update = BashOperator(
task_id="upbit_update",
dag=dag,
bash_command=f"python3 /opt/airflow/operators/runner.py --m KRW-BTC --t '{execution_date}' --c 60"
)
Graph
실행 화면 및 로그