본 실습에서는 S3에 데이터 적재하기 에서 실습한 코드를 Airflow로 구현한다.

실습

실습 코드: airflow/dags/upbit_hourly_dag.py

1. 업비트 API로 분봉 데이터를 받아 S3에 업데이트하는 python script 작성

실습 코드: airflow/operators/runner.py

4.2 실습 코드와 거의 유사

2. BashOperator로 1번 python script 실행하는 DAG 제작

  1. 파라미터로 입력하는 execution_date 를 Jinja Template으로 불러온다. (참고: Airflow templates reference)

    execution_date = '{{((dag_run.logical_date + macros.timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S")}}'
    
  2. 1번에서 제작한 script를 실행하는 DAG 생성

    upbit_update = BashOperator(
        task_id="upbit_update",
        dag=dag,
        bash_command=f"python3 /opt/airflow/operators/runner.py --m KRW-BTC --t '{execution_date}' --c 60"
    )
    

DAG 실행 모습