안녕하세요 오늘은 BESPIN GLOBAL Data실 한제호님이 작성해주신 ‘Spark 7편: Optimising Shuffle Partitions(coalescePartitions)’ 에 대해 소개해드리도록 하겠습니다.
목차
1. AQE 기능
2. Spark 버전별 최적화 모듈 진화 과정
3. coalescePartitions
1. AQE 기능
- Spark 3.0으로 업데이트 되면서 가장 좋은 기능으로 판단되는 기능이 AQE 기능입니다.
- Spark내의 Optimizer인 Spark Catalyst 기능은 Spark 기능중에 가장 중요한 계층의 기능입니다. Spark Catalyst는 논리 계획에서 물리계획으로 변경되는 시점에 쿼리 최적화를 실행합니다. 즉, 물리계획이 만들어지고 계획이 실행된 다음에는 최적화를 실행하지 않습니다.
- Spark 3.0에서는 추가 최적화 계층으로 AQE를 도입하였으며 RunTime시에 실행의 일부로 수집된 메트릭 정보에 따라 쿼리를 재 최적화합니다. 재 최적화는 Spark Job내에서 각 Stage 가 끝나는 시점에 수행합니다. 최종적으로 사용자 입장에서는 Spark Optimizer를 통해 Plan이 만들어질때와 stage가 넘어가기 직전 크게 2가지 측면에서 최적화 작업이 발생합니다.
2. Spark 버전별 최적화 모듈 진화 과정
- Spark 버전별 최적화 모듈 진화 과정
- Spark 1.x – Catalyst Optimizer 및 Tungsten Execution Engine 도입
- Spark 2.x – Cost Model 기반의 Optimizer 추가
- Spark 3.0 – Adaptive Query Execution 추가
3. coalescePartitions
- 이번 게시글에서는 Spark3.0 AQE 기능중에 coalescePartitions에 대해 소개하려 합니다.
- Spark 환경에서의 셔플(파티션간의 데이터 이동)은 쿼리 성능에 지대한 영향을 미친다. 셔플이 발생할 때 파티션 수는 shuffle partition 설정 값에 따라 결정된다. 그래서 셔플 파티션 수 크기에 따라 아래와 같은 문제가 발생할 수 있습니다.
- 셔플 파티션 수가 작은 경우
- 각 셔플 파티션에 적재되는 데이터가 큰 경우 Disk를 활용(disk spill)하는 경우가 생기기 때문에 쿼리 성능 저하 발생
- 셔플 파티션 수가 큰 경우
- 각 셔플 파티션에 저장되는 데이터가 작은 경우 많은 셔플 파티션들을 읽어야 하기 때문에 비효율적인 네트워크 I/O로 인해 쿼리 성능 저하 발생
- 셔플 파티션 수가 작은 경우
- 따라서 기존에는 셔플파티션 수를 사용자가 판단하여 적절하게 설정하는 방식으로 최적화해 왔습니다. (Spark 2.x까지는 사용자가 알아서 튜닝해야 했습니다.)
- AQE에서는 각 stage가 끝남과 동시에 셔플 통계정보를 바탕으로 많은수의 작은 파티션을 합쳐서 큰 파티션으로 뭉쳐주는 기능을 제공합니다.

- AQE 기능이 비활성화 되어 있는 경우
- join 후 셔플파티션 수 만큼 파티션 생성됨
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", 200)
sales_df = spark.read.option("header", "true").csv("./dataset/sales/sales.csv")
return_df = spark.read.option("header", "true").csv("./dataset/sales/return.csv")
sales_df.createOrReplaceTempView("sales")
return_df.createOrReplaceTempView("return")
join_df = spark.sql("""
SELECT sales.ORDER_NO, sales.ORDER_DATE, sales.CUSTOMER_NO, return.RETURN
FROM sales FULL JOIN return ON sales.ORDER_NO = return.ORDER_NO
""")
join_df.write.mode("overwrite").csv("./dataset/output/sales/shuffle/")

sparkui상에 stage 로그를 보면 join 이후 디폴트 셔플 파티션 수에 따라 200개로 늘어난 것을 볼 수 있습니다.
여기까지 ‘Spark 6편: Yarn Resource Manager 라벨링’에 대해 소개해드렸습니다. 유익한 정보가 되셨길 바랍니다. 감사합니다.
Written by 한 제호 / Data실
BESPIN GLOBAL