안녕하세요 오늘은 베스핀글로벌 D&A 한제호님이 작성해 주신 Apache Airflow에 대해 알아보겠습니다.
궁금하신 부분이 있으시면 댓글을 달아주세요 🙂
1. 개요
Airbnb에서 개발
2016년 아파치 재단 incubator project
현재 아파치 탑레벨 프로젝트
기존 workflow의 문제점(예: crantab)
- 실패 복구
- 모니터링
- 의존성 관리
- 확장성
- 배포
- 워크플로우를 작성하고 스케줄링하고 모니터링 하는 작업을 프로그래밍 할 수 있게 해주는 플랫폼
python 기반 개발
- 분산 환경에서 확정성
- 웹 대시보드
- 여러 플러그인 모듈을 통해 커스터마이징 가능
2. 용어
workflow: 의존성으로 연결된 작업(task)들의 집합
웹서버: 웹 대시보드 UI
스케줄러: 워크플로우가 언제 실행되는지 관리
Metastore: 메타데이터 관리(예: DAG정보)
Executor: 테스크가 어떻게 실행되는지 정의
Worker: 테스트를 실행하는 프로세스
DAG(Directed Acyclic Graph): 방향성 비순환 그래프
- DAG 인 경우
- DAG가 아닌 경우
Operator: 작업(Task)를 정의하는데 사용
Task: Operator Instance(Operator가 실행되면[runtime] Task가 됨)
3. Architecture
One Node Architecture
- Metastore에는 dag 정보를 가지고 있기 때문에 Webserver 및 Scheduler에서 dag정보를 읽어 Executor를 통해 실행함
- Executor를 통해 실행된 Task의 상태정보는 주기적으로 Metastore에 업데이터 되며 해당 정보를 WebServer와 Scheduler가 참조하게 됨
- Executor내에는 Queue가 존재하여 Task의 처리 순서를 보장하게 됨
Multi-Node Architecture
- Queue가 Executor 밖에 존재함
- Master Node, Worker Node, Metastore로 물리적인 환경이 분리되어 있음
4. DAG의 생성과 실행 과정
개발자가 새로운 DAG를 작성 후 Floder DAGs 안에 배치
Web Server와 Scheduler가 DAG를 파싱
- 이 과정에서 코드에 대한 컴파일 오류(Syntax)가 체크됨
Scheduler가 Metastore를 통해 DagRun 오브젝트를 생성
- DagRun은 사용자가 작성한 DAG의 인스턴스
- DagRun status: Running
Task Instance 오브젝트 스케줄링
TaskInstance를 Executor로 보냄
완료 후 Executor는 Metastore에 완료 했다고 보고
Scheduler: DAG 실행의 완료 여부 체크
DagRun status: Completed
- UI 업데이트
https://www.youtube.com/watch?v=UFsCvWjQT4w
5. Web UI
DAGs
- pause/unpause: 해당 dag 활성화/비활성화
- DAG: DAG 이름(태그포함)
- Owner: DAG 생성자 및 관리자
- Runs: DAG 상태
- Schedule: DAG 실행 주기(crontab 형식)
- Last Run: 마지막 실행 시간
- Next Run: 다음 실행 시간
- Recent Tasks: 실행된 Task의 상태(예: upstream_failed→의존성이 있는 경우 상위 task 실패시 표시됨)
주요 모니터링 화면
- Grid: 스케줄 단위로 실행된 DAG의 Task 상태 모니터링, 로그 확인, 재실행등 기능 제공
- Graph: Task별로 종속 관계 확인 가능
- Gantt: Dag내의 Task별로 소요 시간, 병렬성 관계 확인 가능. 운용시 각 Task별로 지연시간등을 확인하여 튜닝에 활용 가능
6. Operator
내장 Operators
- BashOperator
- PythonOperator
- EmailOperator
Action Operators: Action을 실행 시 사용
Transfer Operators: 데이터를 옮길때 사용
Sensors Operators: 조건이 맞을때까지 기다릴때 사용
외부 Provider: airflow에서 제공하는 추가 외부 패키지
7. Catch Up & Backfill
date의 이해
- start_date: DAG 코드상에 입력되는 최초 날짜 데이터. 해당 값을 기준으로 Catchup이 동작함
- execution_date(logical_date)
- dag가 실행되어야 하는 기대값
- 매일 수행되는 배치에서 전일 데이터를 가져와서 처리하는 경우 airflow의 경우 execution_date를 활용하여 동적으로 처리 가능함
- 배치 시간에 따라 또는 DAG가 수행될때마다 변경되는 데이터
- UTC 기준 시간이기 때문에 한국 시간으로의 변경 필요
YESTERDAY = '{{ execution_date.in_timezone("Asia/Seoul").strftime("%Y-%m-%d") }}'
TODAY = '{{ next_execution_date.in_timezone("Asia/Seoul").strftime("%Y-%m-%d") }}'
Catch Up
- Scheduler는 전체 DAG의 lifetime을 확인하면서 실행되지 않은 DAG를 실행함
- 이렇게 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는것을 Catch up 이라고 함
- 코드상에 CacheUp 옵션을 통해 활성화/비활성화 처리 가능
- 신규 DAG 적용시 과거 데이터에 대한 마이그레이션이 필요한 경우
- 일정 기간 중지된 DAG를 다시 시작시 중지된 기간만큼의 데이터 마이그레이션이 필요한 경우
- (주의사항)Catchup을 할 때 DagRun이 한번에 실행 되기 때문에 서버에 과부하가 올 수 있으므로 아래 옵션을 통해 일부 조정이 필요함
- max_active_runs : DAG수준에서 설정 되며, Catch up 중에 DAGrun이 얼마나 실행 될 수 있는지를 설정
- depends_on_past : 작업 수준에 설정 되며, 가장 최근에 DAG에서 동일한 작업이 수행 되었을 때 작업이 수행 될 수 있도록 제약
- wait_for_downstream : DAG 수준에서 설정되며 다음 DAG를 실행 하려면 전체 task들이 수행 되어야 실행
- catchup_by_default : config파일에서 설정이 가능하며 DAG를 만들 때 기본 값 True, False를 정할 수 있음
Backfill
- Catch Up과 비슷한 역할이긴 하나 몇가지 차이점이 있음
- start_date 이전의 날짜를 명시하여 실행 가능
- 보통은 전체 DAG를 재 실행하는 용도보다는 실패한 특정 Task에 대해 재실행하는데 사용됨
# 지정한 기간동안 backfill 수행하지 않을 날짜만 수행
airflow dags backfill --start-date {date} --end-date {date} dag_id
# 지정한 기간동안 backfill 모든 재실행
airflow dags backfill --start-date {date} --end-date {date} --reset-dagruns dag_id
# 지정한 기간동안 실패한 task들만 재실행
airflow dags backfill --start-date {date} --end-date {date} --rerun-failed-tasks
감사합니다 🙂