Spark 3편 – Optimize Partition

안녕하세요 오늘은 BESPIN GLOBAL Data실 한제호님이 작성해주신 ‘Spark 3편 – Optimize Partition’ 에 대해 소개해드리도록 하겠습니다.

목차
1. 파티션의 개념
2. 파티션 활용의 잘못된 사례
3. 파티션 종류
4. Output Partition
5. Shuffle Partition

1. 파티션의 개념

  • 클러스터 전체에 작업을 분산하고 각 노드의 메모리 요구사항을 줄이기 위해 Spark은 데이터를 파티션이라는 더 작은 단위로 분할 합니다.
클러스터 전체에 작업을 분산하고 각 노드의 메모리 요구사항을 줄이기 위해 Spark은 데이터를 파티션이라는 더 작은 단위로 분할 합니다.
  • 하나의 데이터 세트에 대해 각 파티션의 논리적인 집합을 RDD라고 함
  • 파티션 수에 따라 병렬화 정도가 결정됨
  • 1 Core = 1 Task = 1 Partition
    • 파티션 수 → Core 수
    • 파티션 크기 → 메모리 크기
  • 예시
    dataframe은 18개의 파티션으로 구성되어 있으며 5 core로 executor가 구성되어 있어 동시에 5개의 파티션이 실행됩니다.
예시

2. 파티션 활용의 잘못된 사례

  • 데이터 왜곡
    • 핫 파티션 발생
핫 파티션 발생
  • P1 파티션이 P2 및 P3 파티션보다 데이터를 더 많이 가지고 있으므로 전체적은 처리 시간이 지연됨
  • 스케줄링
  • executor 별 보유한 Task 수 대비 파티션 수가 작아서 컴퓨팅 파워 낭비 발생
executor 별 보유한 Task 수 대비 파티션 수가 작아서 컴퓨팅 파워 낭비 발생
  • Executor1의 경우 Executor2에 비해 파티션 수가 많으므로 2배의 시간 소요. Executor2의 경우 낭비되는 Task 발생
  • 해결책
    • 적절한 파티션수 조정을 통해 성능 향상을 가져올 수 있음

3. 파티션 종류

  • Input Partition
    • 관련 설정: spark.sql.files.maxPartitionBytes
    • 파일형태의 데이터를 읽을때 생성되는 파티션이며 위 옵션에 따라 파티션당 최대 사이즈가 결정됩니다.
    • Default: 134217728 (128MB)
    • 단일 파일이 아주 큰 경우 해당 값을 늘려서 파티션 수를 줄일 수 있습니다.
    • 파일이 작은 경우 파일 단위로 파티션이 생성됩니다.
    • 예시
      • 500MB로 설정
spark.conf.set("spark.sql.files.maxPartitionBytes", "524288000") # 500MB
trip_df1 = spark.read.csv("s3://hjh-ap-northeast-2/dataset/tripdate/big_data/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
print(f"partition num: {trip_df1.rdd.getNumPartitions()}") # output -> partition num: 5
500MB로 설정
  • 128MB로 설정
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") # 128MB
trip_df1 = spark.read.csv("s3://hjh-ap-northeast-2/dataset/tripdate/big_data/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
print(f"partition num: {trip_df1.rdd.getNumPartitions()}") # output -> partition num: 18
128MB로 설정
  • 대부분 Spark 활용 시 해당 값을 조절하는 경우는 거의 없음

4. Output Partition

  • 관련 설정: df.repartition(cnt or column), df.coalesce(cnt)
  • 파일을 저장할 때 생성되는 파티션이며 해당 값에 따라 쓰기시 파일 수가 결정됩니다.
  • HDFS 및 Object Storage에 저장 할때는 큰 파일로 저장하기를 권고합니다.
  • coalesce는 파티션을 효율적으로 줄이는 경우만 사용되는 반면 repartition 파티션을 늘리거나 줄일때 사용가능합니다.
  • 예시
    • 파티션 조정 없이 Write 시
trip_df = spark.read.option("header", "true").csv("./dataset/taxi/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
trip_df.write.mode("overwrite").csv("./dataset/output/taxi/default/")
예시

파티션 조정 없이 Write 시
  • coalesce(5)
trip_df = spark.read.option("header", "true").csv("./dataset/taxi/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
trip_df = trip_df.coalesce(5)
trip_df.write.mode("overwrite").csv("./dataset/output/taxi/default/")
coalesce(5)

coalesce(5)
  • repartition(5)
trip_df = spark.read.option("header", "true").csv("./dataset/taxi/part-00000-ddeeda58-424e-43fa-bd31-6057d816f97c-c000.csv")
trip_df = trip_df.repartition(5)
trip_df.write.mode("overwrite").csv("./dataset/output/taxi/default/")
repartition(5)

repartition(5)
  • 사용 예시
    • narrow transformation(where 등)를 통해 데이터 변환이 발생한 경우 데이터가 줄어들기 때문에 파티션 별로 메모리 낭비가 발생할 수 있습니다. 이럴경우 repartition을 통해 파티션 조정이 필요할 수 있습니다.
    • groupby를 통해 데이터 집계시 데이터 크기가 작아집니다. 이때 파티션 수는 spark.sql.shuffle.partitions 설정된 값에 따라 파일수(파티션수)가 지정되는데 write시 파일 크기를 늘리기 위해 repartition 또는 coalesce를 사용할 수 있습니다.

5. Shuffle Partition

  • 관련설정: spark.sql.shuffle.partitions
  • 성능에 가장 크게 영향을 미치는 파티션
  • join, groupBy 연산을 수행할때 해당 값에 따라 결과에 대한 파티션수가 결정됨
join, groupBy 연산을 수행할때 해당 값에 따라 결과에 대한 파티션수가 결정됨
  • stage Id : 0 ~ 4 (셔플파티션 200), 5 ~ 9 (셔플파티션 100)
  • Default 값은 200이며 DataFrame의 파티션 수에 따라 해당 값 조정을 통해 Shuffle Spill을 최소화 할 수 있음

여기까지 ‘Spark 3편 – Optimize Partition’에 대해 소개해드렸습니다. 유익한 정보가 되셨길 바랍니다. 감사합니다. 

Written by 한 제호 / Data실

BESPIN GLOBAL