메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* 출처 : http://kysepark.blogspot.kr/2016/04/how-to-tune-your-apache-spark-jobs-part.html


---------------------------------------------------------------------------------------

본 글은 Cloudera 블로그에 게재된 "How-to: Tune Your Apache Spark Jobs (Part 2)"  정리한 글이다.
원문은 아래 url에서 볼 수 있다.

Tuning Resource Allocation

YARN을 Spark의 Cluster Manger로 사용할 때에 대한 내용이다.
Spark의 두 개의 메인 리소스는 cpu와 memory이다. (디스크와 네트워크 I/O도 Spark와 YARN 성능의 한 부분이지만 현재로선 Spark와 YARN 모두 두 부분에 대해서 아무것도 하고 있지 않다.)
모든 Spark executor는 고정된 동일한 core 개수와 memory를 갖는다. Spark 리소스 할당 옵션은 다음과 같다. (spark-submit의 옵션/properties)
  • --executor-cores/spark.executor.cores => executor의 core 개수
  • --executor-memory/spark.executor.memory => executor의 memory 용량
  • --num-executor/spark.executor.instances => executor 개수. CDH 5.4/Spark 1.3부터는 dynamic allocation(spark.dynamicAllocation.enabled)을 사용하여 --num-executor 설정을 하지 않아도 된다. Dynamic allocation은 대기하고 있는 task의 backlog가 있을 경우 executor를 요청하고 idle 상태일 경우엔 executor를 해제한다.

Spark와 YARN의 리소스 관계에 대해서 이해하기 위해서는 YARN의 다음 프로퍼티에 대해 알아야 한다.
  • yarn.nodemanager.resource.memory-mb =>각 노드의 컨테이너들에서 사용되는 memory의 총합의 최대값을 제어
  • yarn.nodemanger.resource.cpu-vcores => 각 노드의 컨테이너들에서 사용되는 core 총개수의 최대값을 제어

5개의 executor core를 요청할 경우, 5개의 virtual core들이 할당되게 된다.
메모리의 경우에는 다음과 같은 이유로 좀 더 복잡하다.
  • --executor-memory/spark.executor.memory는 executor의 힙 사이즈를 제어한다. 
    • 그러나 JVM에서는 힙을 벗어나는 메모리를 사용할 수도 있다. 예를 들어 interned Strings와 direct byte buffer가 있다.
    • 각각의 executor가  YARN에 메모리 요청을 할 때 메모리가 full이 되어있는 지를 알아낼 수 있게 spark.yarn.executor.memoryOverhead 프로퍼티의 값이 executor memory에 추가되었다. 
      • 이 프로퍼티의 디퐅트 값은 max(384, .07*spark.executor.memory)이다.
  • YARN은 요청되는 메모리에서 소수점 이하 값을 올림 처리한다.
    • yarn.scheduler.minimum-allocation-mb와 yarn.scheduler.increment-allocation-mb 프로퍼티들은 각각 메모리 요청에 대한 최소 값과 증가  값을 제어한다.
다음 그림은 Spark와 YARN의 메모리 프로퍼티의 hierarchy를 보여준다.

Spark executor의 메모리 사이징에 대한 위와 같은 내용이 고려될 사항으로 부족하면  마지막 고려사항이 몇 가지 있다.
  • Application master(짧게 AM)는 YARN으로부터 컨테이너를 요청할 수 있는 executor가 아닌 컨데이너로 자신만의 리소스를 사용한다. 
    • yarn-client 모드에선 디폴트로 1024MB와 1개의 vcore를 사용한다.
    • yarn-cluster 모드에선 application master는 driver를 실행한다. 따라서 --driver-memory와 --driver-cores  프로퍼티로 리소스를 늘리는데 용이하다.
  • 너무 많은 메모리를 executor에 주고 실행 시, 과도한 garbage collection 딜레이가 발생할 수 있다. 
    • 하나의 executor 메모리의 적당한 상한선으로 64GB라고 러프하게 잡을  수 있다.
  • HDFS client가 수많은 concurrent thread들로 인해 문제가 발생하는 것을 보았다. 
    • 추측컨대, 최대 executor당 5개의 task가 full write throughput을 얻을 수 있다.
    • 그러므로 executor 당 코어 개수를 5개 이하로 주는게 좋다.
  • 작은 executore들(예를 들어, 코어 한 개에 한 개의 task를 실행할 수 있을 정도의 메모리를 가진)은 한 개의 JVM 위에서 복수의 task들을 실행하여 얻을 수 있는 장점을 살릴 수 없다.
    • 변수 값을 broadcast 시 각 executor마다 하나씩 복제해야한다. 따라서 작은 executor가 많을 수로 데이터 복제가 많이 생겨나게 된다.

위의 내용을 좀 더 구체적인 예를 들어 설명하겠다.
다음과 같은 환경이 있다고 가정해보자.
------------------------------------------------
클러스터 => 6 노드
노드 당 코어 개수 => 16개
노드 당 메모리 => 64GB
------------------------------------------------
위와 같은 환경일 경우 yarn.nodemanager.resource.memory-mb는 63 * 1024 = 64512(megabytes), yarn.nodemanager.resource.cpu-vcores는 15개로 각각의 노드에 설정되어야 한다.
노드의 OS와 Hadoop 데몬들을 실행할 수 있는 리소스가 필요하기 때문에 YARN 컨테이너에 리소스를 100%할당하는 걸 피한다. 이런 경우에는 시스템 프로세스에 1G 메모리와 코어 1개를 남겨둔다.
Cloudera Manager는 이런 YARN 프로퍼티 설정을 자동으로 계산하고 설정할 수 있게 도와준다.
그러면 맨 처음 Spark 리소스 할당을 "--num-executor 6 --executor-core 15 --executor-memory 63G"로 할 수 있을 걸로 생각된다.
하지만 이는 잘못된 설정이다.
  • 63GB + executor memory overhead를 NodeManager의 63GB 용량내로 수용할 수가 없다.
  • AM에서 노드의 코어를 1개 사용하게 된다. 그러면 15개의 코어를 가진 executor 하나를 수용할 공간이 없다는 얘기이다.
  • Executor당 15개의 코어는 HDFS I/O throughput을 떨어지게 만든다.
좀 더 나은 선택은 "--num–executor 17 --executor-core 5 --executor-memory 19G"와 같다. 왜냐?
  • 이 설정은 AM과 함께 있는 하나의 노드(2개의 executor를 가짐)를 제외한 모든 노드가 3개의 executor를 갖게 한다. (총 18개의 코어를 사용)
  • --executor-memory 값은 이렇게 나온다. (63/3 executors per node) = 21.  21 * 0.07 = 1.47.   21 - 1.47 ~ 19.  (즉 수용가능 메모리 용량에서 spark.yarn.executor.memoryOverhead 값을 구해 이를 빼준 메모리 사이즈)

===============================================================================
위와 같은 내용을 바탕으로 예제를 만들어 리소스 설정 값을 구해보겠다.
현재 서버 3대로 서비스 구성을 한다고 가정해보자. 대략적인 구성은 다음과 같다.

클러스터 => 3 노드
노드 당 코어 개수 => 32개
노드 당 메모리 => 250GB

위와 같은 구성이 있을 경우 기본적으로 노드의 시스템에서 사용하는 cpu와 메모리를 빼어 YARN 설정을 하면
yarn.nodemanager.resource.memory-mb => 249GB
yarn.nodemanager.resource.cpu-vcores => 31개
와 같다. 

3개 노드의 코어 개수를 모두 더하면 93개이다. 여기서 AM에서 사용할 코어 개수 하나를 빼면 가용할 수 있는 코어의 개수는 92개이다.
코어의 개수를 5개로 하면 18개의 executor를 사용할 수 있다. 그러면 AM 코어 한 개를 더해 총 91개의 코어를 사용하게 된다.
spark.yarn.executor.memoryOverhead 값을 구하면 => 249 / 18 = 13.83.(소수점 이하 내림으로 13)  13 * 0.07  = 0.91. (소수점 이하 올림으로 1)
그러면 executor 당 사용 가능한 메모리는 13 - 1 = 12GB
따라서 다음과 같은 리소스 할당 설정을 얻을 수 있다.
--num-executor 18 --executor-core 5 --executor-memory 12G
===============================================================================

Tuning Parallelism

Spark는 parallel processing engine이다. 그러나 Spark는 마법을 쓰는 parallel processing engine이 아니다. 최적의 병렬처리를 파악하기 위한 제한된 능력을 가지고 있을 뿐이다.
모든 Spark stage는 데이터를 sequential하게 처리하는 task의 개수를 가지고 있다. Spark 작업에서 이 개수는 성능을 결정짖는 하나의 중요한 파라미터이다.

어떻게 이 개수를 결정하는가? Spark Group RDD가 stage에 포함되어지는 방식은 part1에서 설명하였다. (reminder: repartition과 reduceByKey는 stage의 경계선을 만든다.)
Stage내 RDD 파티션의 개수는 해당 RDD가 의존하고 있는(부모RDD) 파티션의 개수와 같다. 몇 가지 예외로 coalesce transformation은 부모RDD보다 파티션의 개수를 줄여서 RDD를 생성한다.
union은 부모들의 파티션 개수들의 합으로 이뤄진 RDD를 생성한다. cartesian은 RDD들의 곱의 개수만큼의 파티션을 가진 RDD를 생성한다.

부모가 없는 RDD는 어떨까? textFile이나 hadoopFile로 생성된 RDD는 MapReduce InputFormat에 사용된  인자에 의해 파티션 개수가 결정된다. 일반적으로 각 HDFS블락으로 이뤄진 파티션이 있고 이를 읽어서 사용한다.(hadoopFile 사용 시, HDFS에 파티션별로 나눠진 파일대로 파티션 수가 생성된다는 얘기인듯)
parallelize를 통해 생성된 RDD의 파티션 개수는 사용자의 설정 또는 spark.default.parallelism 프로퍼티 값을 인자로 사용해 생성된다.

RDD의 파티션 개수를 확인하고 싶으면 언제든지 rdd.partition().size() 를 호출하여 확인할 수 있다.

주요 고려사항은 task의 수가 너무 적을 경우이다. 작업을 실행 시킬 수 있는 슬롯보다 task의 개수가 적을 때, stage는 cpu를 충분히 활용할 수 없게 된다.
적은 task 수는 각 task에서 발생되는 aggregation 실행에서 더 많은 메모리 부담을 주게된다.
join, cogroup, *ByKey 함수들의 실행은 hashmap이나 in-memory buffer를 사용하는 데이터 구조를 가진다. 이 데이터 구조에 해당 객체를 저장하고 group 이나 sort 작업을 한다.
join, cogroup, groupByKey 함수들은 이와 같은 데이터 구조를 stage내의 task에서 shuffle 시, fetching을 위한 부분에서 사용한다.(데이터 fetch 기능 측면으로서의 셔플을 의미하는 듯 함)
reduceByKey와 aggregateByKey 함수는 이와 같은 데이터 구조를 stage의 task에서 shuffle 시, 두 가지 측면(group, sort) 모두를 위해 사용한다.(데이터의  그룹화와 정렬 측면으로서의 셔플을 의미하는 듯 함)

어떤 혼란으로 인해 aggregation 을 할 때에 메모리 크기에 데이터가 안맞을 때가 있다. 첫 째, 이럴 경우 해당 데이터 구조에 많은 데이터를 가지고 있게 되어 garbage collection 부담이 늘게 된다. 이럴 경우 실행이 중지될 수 있다.
두 번째, 데이터가 메모리 크기에 맞지 않을 때, Spark는 이를 디스크에 분할하여 저장한다. 그렇게 되면 disk I/O와 sorting을 발생시키게 된다. 크게 shuffle을 하고 있는 중의 이러한 overhead는 지금껏 Cloudera 고객들을 봤을 때, 작업이 중지되는 가장 큰 원인이었다. 

그러면 어떤 방법으로 파티션의 개수를 늘려줘야 할까? 다음과 같은 옵션이 있다.
  • repartition을 사용하라. 이 함수는 shuffle을 발생시킨다.
  • InputFormat 설정 시, 좀 더 많은 분할이 발생되도록 하라.
  • 작은 블록 사이즈로 입력 데이터를 HDFS에 쓰라.(HDFS에 파일을 쓸 때를 의미함)

어떤 stage가 다른 stage로부터 입력 데이터를 받을 때, stage의 경계선을 만드는 transformation은 numPartition 인자를 다음과 같이 받는다.
val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)

"X"는 어떤 값이 될까? 가장 간단한 방법은 다음 실험을 통해 파티션 개수를 구하는 것이다.
=>부모RDD의 개수를 확인한 후, 이 수를 성능 향상이 멈출 때까지 계속하여 1.5배를 곱한다.

X를 계산하는 좀 더 원리적인 방법도 있다. 그러나 이는 몇몇의 수에 대해서 계산하기 어렵기때문에 a priori(시스템 기술 등을 종합적으로 검토하고 선험성()을 높이는 것)를 적용하기 어렵다. 
이 방법을 추천하기 위해 여기에 포함시키는 건 아니고 어떤 방식인지에 대한 이해를 돕기 위해 여기 해당 방법을 포함시킨다. 주요 목적은 가용가능한 메모리에 맞게 각 task로 데이터를 할당하여 동작시킴에 있다.

Task에 가용가능한 메모리는 (spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction) / spark.executor.cores 이다. Memory fraction은 0.2, safety fraction은 0.8이다.
총 shuffle 데이터의 in-memory 크기는 알아내기 여렵다. 이에 가장 근접한 휴리스틱 방법은 해당 stage에서 동작하는 Shuffle Spill(Memory) metric과 Shuffle Spill(Disk) 사이의 ratio를 구하는 방법이다.
그리고 이 숫자를 총 shuffle write와 곱한다. 그러나 이는 해당 stage가 reduction을 실행 중일 때 복잡한 양상을 보일 수 있다.
계산된 값은 소수점 이하 값을 올림 처리한다. 왜냐하면 과다한 파티션이 과소한 파티션보다 낫기 때문이다.

Slimming Down Your Data Structures

Data flow는 deserialized java object representation과 serialized binary representation으로 사용될 수 있다.
일반적으로 Spark는 전자를 사용한다. 성능을 위해서라면 후자를 사용하고 관련 라이브러리로 Kyro를 사용하라.

Data Formats

Avro, Parquet, Thrift, Protobuf와 같은 extensible binary format을 사용하라.
그 중에 하나를 선택하여 사용을 고수하라.
번호 제목 날짜 조회 수
162 Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트 file 2016.10.31 1419
161 Flume을 이용한 데이타 수집시 HBase write 성능 튜닝 file 2016.10.31 811
» How-to: Tune Your Apache Spark Jobs (Part 2) file 2016.10.31 478
159 VisualVM 1.3.9을 이용한 spark-submit JVM 모니터링을 위한 설정및 spark-submit실행 옵션 2016.10.28 2414
158 운영중인 상태에서 kafka topic삭제하고 재생성하여 처리되지 않은 메세지 모두 삭제하기 2016.10.24 1240
157 producer / consumer구현시 설정 옵션 설명 2016.10.19 777
156 java.lang.OutOfMemoryError: unable to create new native thread오류 발생지 조치사항 2016.10.17 1204
155 AIX 7.1에서 hive실행시 "hive: line 86: readlink: command not found" 오류가 발생시 임시 조치사항 2016.09.25 1014
154 hive기동시 Caused by: java.net.URISyntaxException: Relative path in absolute URI: ${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D 오류 발생시 조치사항 2016.09.25 1054
153 schema설정없이 hive를 최초에 실행했을때 발생하는 오류메세지및 처리방법 2016.09.25 1492
152 파일끝에 붙는 ^M 일괄 지우기(linux, unix(AIX)) 혹은 파일내에 있는 ^M지우기 2016.09.24 238
151 AIX 7.1에 MariaDB 10.2 소스 설치 2016.09.24 2938
150 ./hadoop-daemon.sh start namenode로 namenode기동시 EditLog의 custerId, namespaceId가 달라서 발생하는 오류 해결방법 2016.09.24 506
149 hadoop 어플리케이션을 사용하는 사용자 변경시 바꿔줘야 하는 부분 2016.09.23 960
148 format된 namenode를 다른 서버에서 다시 format했을때 오류내용 2016.09.22 331
147 AIX 7.1에 Hadoop설치(정리중#2) 2016.09.20 376
146 AIX 7.1에 Hadoop설치(정리중) 2016.09.12 972
145 No broker partitions consumed by consumer thread오류 발생시 확인/조치할 사항 2016.09.02 1018
144 kafka 0.9.0.1버젼의 producer와 kafka버젼이 0.10.0.1인 consumer가 서로 대화하는 모습 2016.08.18 429
143 down된 broker로 메세지를 전송하려는 경우의 오류 내용및 조치사항 2016.08.12 410
위로