RDD는 스파크가 사용하는 기본 데이터 모델로서 RDD를 잘 이해하고 다루는 것은 스파크 어플리케이션을 작성할 때 가장 중요한 기초라고 할 수 있다.

RDD를 다루기 전 알아둬야할 중요한 사항
- 스파크 클러스터
- 분산 데이터로서의 RDD
- RDD의 불변성
- 파티션
- HDFS
- Job과 Executor
- 드라이버 프로그램
- 트랜스포메이션과 액션
- 지연(lazy) 동작과 최적화
- 함수의 전달

o 데이터 타입에 따른 RDD 연산
RDD는 데이터 처리에 유용한 다양한 연산을 수행할 수 있는 메소드를 제공한다. RDD가 제공하는 메소드는 데이터 타입과 밀접한 관계를 맺고 있다. 따라서 스파크에서 자주 사용되는 특별한 데이터 유형에 대해 좀 더 특화된 메소드를 제공할 수 있도록 클래스를 제공하고 있다. 
- PairRDDFunctions : 키와 값의 형태로 구성된 데이터 연산을 제공하는 클래스
  groupBykey나 reduceByKey, mapValues와 같이 키를 사용하는 유용한 연산을 제공한다.
- OrderedRDDFunctions : 키와 값으로 구성된 데이터를 대상으로 하며, 키에 해당하는 변수 타입이 정렬 가능한 경우

  를 위한 것이다. sortBykey와 같은 연산을 제공한다.
- DoubleRDDFunctions : double 유형의 데이터를 위한 것으로 합을 구하는 sum()이나 평균을 구하는 mean()과 같은 

  연산을 제공한다.
- SequenceFileRDDFunctions : 하둡의 시퀀스 파일을 다루기 위한 연산을 제공한다.

o 스파크컨텍스트 생성
스파크컨텍스트는 스파크 애플리케이션과 클러스터의 연결을 관리하는 객체로서 모든 스프크 애플리케이션은 반드시
스파크컨텍스트를 생성해야 한다. RDD를 비롯한 스파크에서 사용하는 주요 객체는 스파크컨텍스트를 이용해 생성 할 
수 있다.

val conf = new SparkConf().setMaster("local[*]").setAppName("RDDCreateSample")
val sc = new SparkContext(conf)

o RDD 생성
Spark는 두 종류의 RDD 생성 방법을 제공한다.
- 드라이버 프로그램의 컬렉션 객체를 이용
- 데이터베이스와 같은 외부 데이터를 읽어서 새로운 RDD를 생성

val rdd1 = sc.paralleize("a","b","c","d")   // 컬렉션 객체를 이용한 RDD 생성
val rdd2 = sc.parallelize(1 to 1000, 10)  // 파티션 개수 지정 
val rdd3 = sc.textfile("/README.md")    // 외부파일을 이용한 RDD 생성 

RDD가 제공하는 연산은 트랜스포메이션과 액션으로 나눌 수 있다. 두 연산을 구분하는 기준은 연산의 결과가 RDD인지 아닌지에 달려 있다. 트랜스포메이션은 기존 RDD를 이용해 새로운 RDD를 생성하는 연산이다.

o 클러스터 환경의 공유변수
하둡이나 스파크와 같이 클러스터 환경에서 동작하는 애플리케이션은 하나의 잡(Job)을 수행하기 위해 클러스터에
속한 다수의 서버에서 여러 개의 프로세스를 실행하게 되므로 모든 프로세스가 공유할 수 있는 자원을 관리하기란
쉽지 않다.
따라서 이러한 프레임워크들은 다수의 프로세스가 공유할 수 있는 읽기 자원과 쓰기 자원을 설정할 수 있또록 지원하는데
하둡은 분산캐시와 카운터를, 스파크는 브로드캐스드 변수(Broadcast Variables)와 어큐뮬레이터(Accumulators)를 제공한다.

o broadcast 변수
스파크의 잡이 실행되는 동안 클러스터 내의 모든 서버에서 공유할 수 있는 일기전용 자원을 설정할 수 있는 변수
예를 들어 사용자ID목록이 담긴 세트 컬렉션 타입의 데이터를 공유 변수로 설정해 각 서버에서 로그를 처리하며서 현재 처리
하려는 로그가 우리가 찾고 이쓴 사용자의 로그가 맞는지 확인하는 등의 용도로 사용할 수 있다.

먼저 공유 객체를 생성하고, 스파크컨텍스트의 broadcast 메소드의 인자로 등록한다.
이렇게 생성된 브로드캐스트 변수의 value() 메소드를 통해 접근한다.

val broadcastUsers = sc.broadcast(Set("u1","u2"))
val rdd = sc.parallelize(List("u1","u2","u3","u4","u5","u6"),3)
val result = rdd.filter(broadcastUsers.value.contains(_))

o accumulator 변수
브로드캐스트 변수가 읽기 동작을 위한 것이라면 어큐뮬레이터는 쓰기 동작을 위한 것이다. 
예를 들어 다수의 서버로 구성된 클러스터 환경에서 동작하는 프로그램인 경우 오류가 발생했을 때 이 문제가 어느 서버의 어떤
프로세스에서 발생한 것인지 확인하는 것이 쉽지 않다. 따라서 클러스트의 각 서버에서 보내는 에러 정보를 한 곳에 모아서 
볼 수 방법이 있다면 손쉽게 에러 상황을 디버깅할 수 있을 것이다.
어큐뮬레이터는 이처럼 클러스터 내의 모든 서버가 공유하는 쓰기 공간을 제공함으로써 각 서버에서 발생하는 특정 이벤트의 수를
세거나 관찰하고 싶은 정보를 모아 두는 등의 용도로 편리하게 활용할 수 있다.

어큐뮬레이터를 생성하려면 ora.apache.spark.util.AccumulatorV2 클래스를 상속받은 클래스를 정의하여 인스턴스를 생성해야 한다.
그리고 생성한 인스턴스를 스파크 컨텍스트에서 제공하는 register() 메소드를 이용하여 등록한다.

스파크에서는 자주 사용되는 몇 가지 데이터 타입에 대한 어큐뮬레이터를 미리 정의해 뒸으며 이를 편리하게 사용할 수 있게 생성과
등록 작업을 한번에 처리하는 간단한 메소드를 컨텍스트를 통해 제공한다.
- long Accumulator()
- collectionAccumulator()
- doubleAcculator()

val acc1 = sc.longAccumulator("invalidFormat")
val acc2 = sc.collectionAccumulator("invalidFormat2")
val data = List("U1:Addr1","U2:Addr2","U3","U4:Addr4","U5;Addr5","U6:Addr6","U7::Addr7")
sc.parallelize(data,3).foreach{ v =>
  if(v.split(":").length !=2){
    acc1.add(1L)
    acc2.add(v)
  }
}

자체 어큐뮬레이터를 정의하려면 org.apache.spark.util.AccumulatorV2 추상 클래스를 상속받고 isZero(), copy() 등의 필요한
메소드를 정의해야 한다. 이때 AccumulatorV2[Record, Long]에서 각각 입력 데이터 타입과 출력 데이터 타입을 의미한다.
isZero()의 경우 RecordAccumulator에 포함된 Record의 값이 초깃값에 해당하는지 여부를 확인하기 위한 것으로, 초깃값이 맞으면
true, 아니면 false를 돌려준다.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.util.AccumulatorV2

class RecordAccumulator extends AccumulatorV2[Record, Long]{
  private var _record = Record(0)
  def isZero:Boolean = _record.amount == 0 && _record.number==1
  def copy():AccumulatorV2[Record, Long] = {
    val newAcc = new RecordAccumulator
    newAcc._record = Record(_record.amount, _record.number)
    newAcc
  }
  df reset(): Unit = {
    _record.amount = 0L
    _record.number = 1L
  }
  def add(other: Record): Unit = {
    _record.add(other)
  }
  def merge(other: AccumulatorV2[Record, Long]): Unit = other match {
    case o :  RecordAccumulator => _record.add(o._record)
    case _ => throw new RuntimeException    
  }
  def value: Long = {
    _record.amount
  }
}

val acc = new RecordAccumulator
sc.register(acc, "invalidFormat")
val data = List("U1:Addr1","U2:Addr2","U3","U4:Addr4","U5;Addr5","U6:Addr6","U7::Addr7")
sc.parallelize(data,3).foreach{ v =>
  if(v.split(":").length !=2){
    acc.add(Record(1))
  }
}

어큐뮬레이터를 증가시키는 동작은 클러스터의 모든 데이터 처리 프로세스에서 가능하지만 데이터를 읽는 동작은
드라이버 프로그램 내에서만 가능하다. 즉 RDD의 트랜스포메이션이나 액션 연산 내부에서는 어큐뮬레이터의 값을
증가시킬 수만 있을 뿐 그 값을 참조해서 사용하는 것은 불가능하다.
일부러 의도한 특별한 목적이 없는 한 어큐뮬레이터는 액션 연산을 수행하는 메소드에서만 사용해야 한다.
그 이유는 스파크의 트랜스포메이션 연산은 액션 연산과 달리 하나의 잡 내에서 필요에 따라 수차례 반복 실행될
수 있기 때문이다. 따라서 map()이나 flatMap()과 같은 트랜스포메이션 연산 내용에 어큐뮬레이터의 값을 증가
시키는 코드가 포함될 경우 정확하지 않은 데이터가 집계될 수 있다.

[참고] 위키북스 스파크2 프로그래밍 을 정리한 내용임

'Spark > Spark2Programming' 카테고리의 다른 글

Spark SQL  (0) 2019.12.04
Spark RDD Transformation  (0) 2019.11.11
Spark RDD Action  (0) 2019.11.11

+ Recent posts