안녕하세요 오늘은 BESPIN GLOBAL Data실 한제호님이 작성해주신 ‘Spark 3편 – Optimize Partition’ 에 대해 소개해드리도록 하겠습니다.
목차
1. 파티션의 개념
2. 파티션 활용의 잘못된 사례
3. 파티션 종류
4. Output Partition
5. Shuffle Partition
1. 파티션의 개념
- 클러스터 전체에 작업을 분산하고 각 노드의 메모리 요구사항을 줄이기 위해 Spark은 데이터를 파티션이라는 더 작은 단위로 분할 합니다.

- 하나의 데이터 세트에 대해 각 파티션의 논리적인 집합을 RDD라고 함
- 파티션 수에 따라 병렬화 정도가 결정됨
- 1 Core = 1 Task = 1 Partition
- 파티션 수 → Core 수
- 파티션 크기 → 메모리 크기
- 예시
dataframe은 18개의 파티션으로 구성되어 있으며 5 core로 executor가 구성되어 있어 동시에 5개의 파티션이 실행됩니다.

2. 파티션 활용의 잘못된 사례
- 데이터 왜곡
- 핫 파티션 발생

- P1 파티션이 P2 및 P3 파티션보다 데이터를 더 많이 가지고 있으므로 전체적은 처리 시간이 지연됨
- 스케줄링
- executor 별 보유한 Task 수 대비 파티션 수가 작아서 컴퓨팅 파워 낭비 발생

- Executor1의 경우 Executor2에 비해 파티션 수가 많으므로 2배의 시간 소요. Executor2의 경우 낭비되는 Task 발생
- 해결책
- 적절한 파티션수 조정을 통해 성능 향상을 가져올 수 있음
3. 파티션 종류
- Input Partition
- 관련 설정: spark.sql.files.maxPartitionBytes
- 파일형태의 데이터를 읽을때 생성되는 파티션이며 위 옵션에 따라 파티션당 최대 사이즈가 결정됩니다.
- Default: 134217728 (128MB)
- 단일 파일이 아주 큰 경우 해당 값을 늘려서 파티션 수를 줄일 수 있습니다.
- 파일이 작은 경우 파일 단위로 파티션이 생성됩니다.
- 예시
- 500MB로 설정
spark.conf.set("spark.sql.files.maxPartitionBytes", "524288000") # 500MB
trip_df1 = spark.read.csv("s3://hjh-ap-northeast-2/dataset/tripdate/big_data/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
print(f"partition num: {trip_df1.rdd.getNumPartitions()}") # output -> partition num: 5

- 128MB로 설정
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") # 128MB
trip_df1 = spark.read.csv("s3://hjh-ap-northeast-2/dataset/tripdate/big_data/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
print(f"partition num: {trip_df1.rdd.getNumPartitions()}") # output -> partition num: 18

- 대부분 Spark 활용 시 해당 값을 조절하는 경우는 거의 없음
4. Output Partition
- 관련 설정: df.repartition(cnt or column), df.coalesce(cnt)
- 파일을 저장할 때 생성되는 파티션이며 해당 값에 따라 쓰기시 파일 수가 결정됩니다.
- HDFS 및 Object Storage에 저장 할때는 큰 파일로 저장하기를 권고합니다.
- coalesce는 파티션을 효율적으로 줄이는 경우만 사용되는 반면 repartition 파티션을 늘리거나 줄일때 사용가능합니다.
- 예시
- 파티션 조정 없이 Write 시
trip_df = spark.read.option("header", "true").csv("./dataset/taxi/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
trip_df.write.mode("overwrite").csv("./dataset/output/taxi/default/")


- coalesce(5)
trip_df = spark.read.option("header", "true").csv("./dataset/taxi/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
trip_df = trip_df.coalesce(5)
trip_df.write.mode("overwrite").csv("./dataset/output/taxi/default/")


- repartition(5)
trip_df = spark.read.option("header", "true").csv("./dataset/taxi/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
trip_df = trip_df.repartition(5)
trip_df.write.mode("overwrite").csv("./dataset/output/taxi/default/")


- 사용 예시
- narrow transformation(where 등)를 통해 데이터 변환이 발생한 경우 데이터가 줄어들기 때문에 파티션 별로 메모리 낭비가 발생할 수 있습니다. 이럴경우 repartition을 통해 파티션 조정이 필요할 수 있습니다.
- groupby를 통해 데이터 집계시 데이터 크기가 작아집니다. 이때 파티션 수는 spark.sql.shuffle.partitions 설정된 값에 따라 파일수(파티션수)가 지정되는데 write시 파일 크기를 늘리기 위해 repartition 또는 coalesce를 사용할 수 있습니다.
5. Shuffle Partition
- 관련설정: spark.sql.shuffle.partitions
- 성능에 가장 크게 영향을 미치는 파티션
- join, groupBy 연산을 수행할때 해당 값에 따라 결과에 대한 파티션수가 결정됨

- stage Id : 0 ~ 4 (셔플파티션 200), 5 ~ 9 (셔플파티션 100)
- Default 값은 200이며 DataFrame의 파티션 수에 따라 해당 값 조정을 통해 Shuffle Spill을 최소화 할 수 있음
여기까지 ‘Spark 3편 – Optimize Partition’에 대해 소개해드렸습니다. 유익한 정보가 되셨길 바랍니다. 감사합니다.
Written by 한 제호 / Data실
BESPIN GLOBAL