Apache Airflow

안녕하세요 오늘은 베스핀글로벌 D&A 한제호님이 작성해 주신 Apache Airflow에 대해 알아보겠습니다.

궁금하신 부분이 있으시면 댓글을 달아주세요 🙂

1. 개요

Airbnb에서 개발
2016년 아파치 재단 incubator project
현재 아파치 탑레벨 프로젝트
기존 workflow의 문제점(예: crantab)
  • 실패 복구
  • 모니터링
  • 의존성 관리
  • 확장성
  • 배포
  • 워크플로우를 작성하고 스케줄링하고 모니터링 하는 작업을 프로그래밍 할 수 있게 해주는 플랫폼
python 기반 개발
  • 분산 환경에서 확정성
  • 웹 대시보드
  • 여러 플러그인 모듈을 통해 커스터마이징 가능

2. 용어

workflow: 의존성으로 연결된 작업(task)들의 집합
웹서버: 웹 대시보드 UI
스케줄러: 워크플로우가 언제 실행되는지 관리
Metastore: 메타데이터 관리(예: DAG정보)
Executor: 테스크가 어떻게 실행되는지 정의
Worker: 테스트를 실행하는 프로세스
DAG(Directed Acyclic Graph): 방향성 비순환 그래프
  • DAG 인 경우
  • DAG가 아닌 경우
Operator: 작업(Task)를 정의하는데 사용
Task: Operator Instance(Operator가 실행되면[runtime] Task가 됨)

3. Architecture

One Node Architecture
  • Metastore에는 dag 정보를 가지고 있기 때문에 Webserver 및 Scheduler에서 dag정보를 읽어 Executor를 통해 실행함
  • Executor를 통해 실행된 Task의 상태정보는 주기적으로 Metastore에 업데이터 되며 해당 정보를 WebServer와 Scheduler가 참조하게 됨
  • Executor내에는 Queue가 존재하여 Task의 처리 순서를 보장하게 됨
Multi-Node Architecture
  • Queue가 Executor 밖에 존재함
  • Master Node, Worker Node, Metastore로 물리적인 환경이 분리되어 있음

4. DAG의 생성과 실행 과정

개발자가 새로운 DAG를 작성 후 Floder DAGs 안에 배치
Web Server와 Scheduler가 DAG를 파싱
  • 이 과정에서 코드에 대한 컴파일 오류(Syntax)가 체크됨
Scheduler가 Metastore를 통해 DagRun 오브젝트를 생성
  • DagRun은 사용자가 작성한 DAG의 인스턴스
  • DagRun status: Running
Task Instance 오브젝트 스케줄링
TaskInstance를 Executor로 보냄
완료 후 Executor는 Metastore에 완료 했다고 보고
Scheduler: DAG 실행의 완료 여부 체크
DagRun status: Completed
  • UI 업데이트

https://www.youtube.com/watch?v=UFsCvWjQT4w

5. Web UI

DAGs
  • pause/unpause: 해당 dag 활성화/비활성화
  • DAG: DAG 이름(태그포함)
  • Owner: DAG 생성자 및 관리자
  • Runs: DAG 상태
  • Schedule: DAG 실행 주기(crontab 형식)
  • Last Run: 마지막 실행 시간
  • Next Run: 다음 실행 시간
  • Recent Tasks: 실행된 Task의 상태(예: upstream_failed→의존성이 있는 경우 상위 task 실패시 표시됨)
주요 모니터링 화면
  • Grid: 스케줄 단위로 실행된 DAG의 Task 상태 모니터링, 로그 확인, 재실행등 기능 제공
  • Graph: Task별로 종속 관계 확인 가능
  • Gantt: Dag내의 Task별로 소요 시간, 병렬성 관계 확인 가능. 운용시 각 Task별로 지연시간등을 확인하여 튜닝에 활용 가능

6. Operator

내장 Operators
  • BashOperator
  • PythonOperator
  • EmailOperator
Action Operators: Action을 실행 시 사용
Transfer Operators: 데이터를 옮길때 사용
Sensors Operators: 조건이 맞을때까지 기다릴때 사용
외부 Provider: airflow에서 제공하는 추가 외부 패키지

7. Catch Up & Backfill

date의 이해
  • start_date: DAG 코드상에 입력되는 최초 날짜 데이터. 해당 값을 기준으로 Catchup이 동작함
  • execution_date(logical_date)
  • dag가 실행되어야 하는 기대값
    • 매일 수행되는 배치에서 전일 데이터를 가져와서 처리하는 경우 airflow의 경우 execution_date를 활용하여 동적으로 처리 가능함
    • 배치 시간에 따라 또는 DAG가 수행될때마다 변경되는 데이터
    • UTC 기준 시간이기 때문에 한국 시간으로의 변경 필요
YESTERDAY = '{{ execution_date.in_timezone("Asia/Seoul").strftime("%Y-%m-%d") }}'
TODAY = '{{ next_execution_date.in_timezone("Asia/Seoul").strftime("%Y-%m-%d") }}'
Catch Up
  • Scheduler는 전체 DAG의 lifetime을 확인하면서 실행되지 않은 DAG를 실행함
  • 이렇게 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는것을 Catch up 이라고 함
  • 코드상에 CacheUp 옵션을 통해 활성화/비활성화 처리 가능
  • 신규 DAG 적용시 과거 데이터에 대한 마이그레이션이 필요한 경우
  • 일정 기간 중지된 DAG를 다시 시작시 중지된 기간만큼의 데이터 마이그레이션이 필요한 경우
  • (주의사항)Catchup을 할 때 DagRun이 한번에 실행 되기 때문에 서버에 과부하가 올 수 있으므로 아래 옵션을 통해 일부 조정이 필요함
    • max_active_runs : DAG수준에서 설정 되며, Catch up 중에 DAGrun이 얼마나 실행 될 수 있는지를 설정
    • depends_on_past : 작업 수준에 설정 되며, 가장 최근에 DAG에서 동일한 작업이 수행 되었을 때 작업이 수행 될 수 있도록 제약
    • wait_for_downstream : DAG 수준에서 설정되며 다음 DAG를 실행 하려면 전체 task들이 수행 되어야 실행
    • catchup_by_default : config파일에서 설정이 가능하며 DAG를 만들 때 기본 값 True, False를 정할 수 있음
Backfill
  • Catch Up과 비슷한 역할이긴 하나 몇가지 차이점이 있음
  • start_date 이전의 날짜를 명시하여 실행 가능
  • 보통은 전체 DAG를 재 실행하는 용도보다는 실패한 특정 Task에 대해 재실행하는데 사용됨
# 지정한 기간동안 backfill 수행하지 않을 날짜만 수행

airflow dags backfill --start-date {date} --end-date {date} dag_id
# 지정한 기간동안 backfill 모든 재실행
airflow dags backfill --start-date {date} --end-date {date} --reset-dagruns dag_id
# 지정한 기간동안 실패한 task들만 재실행
airflow dags backfill --start-date {date} --end-date {date} --rerun-failed-tasks

감사합니다 🙂

Leave a Comment