안녕하세요:) 오늘은 베스핀글로벌 D&A실 한제호님이 작성해 주신 ‘Spark Backend Service – Optimizer’에 대해 알아보겠습니다. 궁금하신 부분이 있으시면 댓글을 달아주세요!
Spark Backend Service
- Spark은 Lazy evaluation 방식으로 동작한다.
- 많은 이유가 있겠지만 가장 큰 이유는 데이터를 변환하는 과정에서 단일 Transformation로의 튜닝보다 다수의 Tranformation을 모아서 튜닝하는 것이 훨씬 효율적이기 때문이다.
- Spark은 쿼리(Dataframe포함)를 수행하기 위해서는 내부적으로 두 가지 엔진이 동작한다. (Spark 2.x 까지는…)
Catalyst Project
- 사용자가 작성한 코드를 실행 가능한 Plan으로 변경해 주는 작업 담당한다.
Logical Plan을 Physical Plan으로 변경해 주는 역할을 담당한다.
Plan 종류별 역할
- Logical Plan은 작성된 코드를 기반으로 각 Tranformation 단계에 대한 추상화까지만 정의한다. 즉, 최적화가 적용되지 않고 단순히 데이터가 어떻게 변해야 하는지만 정의한다. 실제 어느 Executor에서 또는 어느 RDD에서 동작하는지는 정의하지 않는다. 보통 기본적인 code inspection 활동(컬럼 이름이 맞지 않거나, 데이터 타입이 맞지 않는…)에서 발생하는 오류를 catch한다.
- Physical Plan은 Local Plan을 기반으로 Cost Model 실행 전략에 따라 최적화 작업이 수행된다.(어떻게 클러스터 위에서 실행할지…)
Catalyst Pipeline
분석
- DataFrame 객체의 relation 계산, 컬럼 및 타입 등 확인 작업 수행
Logical Plan 최적화
- 다양한 표현식을 기반으로 Compile Time 계산
- Predicate Pushdown: join & filter → filter & join 형태로 변경 (ex: GroupByKey → ReduceByKey)
- Projection Pruning: 연산에 필요한 칼럼만 가져오기
Physical Plan
- Spark에서 실행 가능한 Plan으로 변환
Code Generation
- 최적화된 Physical Plan을 Java Bytecode로 변환
최적화 예시: 논리 실행 계획에서 물리 실행 계획으로 변경 과정에서 Predicate Pushdown 자동 적용 예시
sales_df = spark.read.option("header", "true").csv("./dataset/sales/sales.csv")
return_df = spark.read.option("header", "true").csv("./dataset/sales/return.csv")
sales_df.createOrReplaceTempView("sales")
return_df.createOrReplaceTempView("return")
join_df = spark.sql("""
SELECT sales.ORDER_NO, sales.ORDER_DATE, sales.CUSTOMER_NO, return.RETURN
FROM sales FULL JOIN return ON sales.ORDER_NO = return.ORDER_NO
WHERE sales.SHIPPING_WAY = '표준 배송'
""")
join_df.explain(True)
Tungsten Project
- Catalyst를 통해 최적화된 Query Plan을 기반으로 좀 더 Row Level(CPU, 메모리) 측면에서 최적화 작업을 담당
감사합니다 🙂
좋은글 감사합니다.