트랜스포메이션은 기존 RDD를 이용해 새로운 RDD를 생성하는 연산이다.
각 요소의 타입을 문자열에서 숫자로 바꾸거나 불필요한 연산을 제외하거나 기존 요소의 값에 특정 값을 더하는 등의
작업이 모두 포함된다. 자주 사용하는 연산은 맵연산, 그룹화 연산, 집합 연산, 파티션 연산, 필터와 정렬 연산 등이다.
맵(Map) 연산 : 요소간의 매핑을 정의한 함수를 RDD에 속하는 모든 요소에 적용해 새로운 RDD를 생성
그룹화 연산 : 특정 조건에 따라 요소를 그룹화하거나 특정 함수를 적용
집합 연산 : RDD에 포함된 요소를 하나의 집합으로 간주할 때 서로 다른 RDD 간에 합집합, 교집합 등을 계산
파티션 연산 : RDD 파티션 개수를 조정
필터와 정렬 연산 : 특정 조건을 만족하는 요소만 선택하거나 각 요소를 정해진 기준에 따라 정렬
[맵 연산]
o map
스파크를 이용한 데이터 처리 작업에서 흔히 사용되는 대표적인 연산 중 하나이다.
map() 메소드가 인자값으로 함수 하나를 전달받아 RDD에 속하는 모든 요소에 적용한 뒤 그 결과값으로 구성된
새로운 RDD를 생성하여 반환한다.
val rdd_map = rdd.map(_+1)
print(rdd_map.collect.mkString(" "))
o flatMap
flatMap() 메소드는 map() 메소드와 유사하게 동작한다.
두 메소드 간의 가장 큰 차이점은 인자로 전달되는 함수가 반환하는 값의 타입이 다르다는 점이다.
map API : map[U](f:(T) => U) : RDD[U]
flatMap API : flatMap[U](f:(T) => TraversableOnce[U]) : RDD[U]
눈여겨 봐야할 부분은 두 메소드의 인자로 전달되고 있는 f라는 함수가 반환하는 값의 타입이다.
f 함수를 보면 map()에서는 U타입의 출력값을 돌려주는 반면 flatMap()에서는 TraversableOnce[U]라는 타입의
출력값을 돌려주고 있다.
U라는 문자는 어떤 값이든 가능하다는 뜻으로 map() 메소드의 경우 인자로 사용하는 함수의 반환 타입에 제약이
없지만 flatMap() 메소드의 경우는 일정한 규칙을 따라야 한다.
TraversableOnce는 스칼라에서 사용하는 이터레이션(Iteration) 타입 중 하나이다. 따라서 이 선언의 의미는 flatMap에 사용하는 함수 f는 반환값으로 리스트나 시퀀스 같은 여러 개의 값을 담은 일종의 컬렉션과 유사한 타입의 값을 반환해야 한다.
val fruits = List("apple,orange", "grape,apple,mango", "blueberry,tomato,orange")
val rdd1 = sc.parallelize(fruits)
val rdd2 = rdd1.map(_.split(","))
println(rdd2.collect().map(_.mkString("{", ", ", "}")).mkString("{", ", ", "}"))
val rdd2 = rdd1.flatMap(_.split(","))
print(rdd2.collect.mkString(", "))
map()의 정의 f:(T)=>U 에서 T가 문자열이고 U가 배열이므로 결과값의 타입은 RDD[배열]이다.
각 배열 속에 포함된 요소를 모두 배열 밖으로 끄집어내는 작업을 위해서는 flatMap()을 사용해야 한다.
하나의 입력값에 대응하는 반환값이 여러 개 일 때 유용하게 사용할 수 있다.
val rdd2 = rdd1.flatMap( log => {
// apple이라는 단어가 포함된 경우만 처리
if(log.contains("apple"){
Some(log.indexOf("apple"))
} else{
None
}
})
Some과 None은 값이 있거나 없을 수 있는 옵션 상황을 표시하는 스칼라 타입이다.
flatMap()에서 요구하는 TraversableOnce 타입은 아니지만 내부적인 반환에 의해 Some은 값이 있는 집합,
None은 값이 없는 집합처럼 취급된다. 따라서 최종 결과로 모든 집합의 값을 하나로 만들면 Some에 포함된
들만 남게 된다.
o mapPartitions
map과 flatMap 메소드가 RDD의 각 요소를 하나씩 처리한다면 mapPartitions()는 파티션 단위로 처리한다.
인자로 전달받은 함수를 파티션 단위로 적용하고, 그 결과로 구성된 새로운 RDD를 생성한다.
파티션 단위로 파티션에 속한 모든 요소를 한 번의 함수 호출로 처리할 수 있기 때문에 파티션 단위의 중간
산출물을 만들거나 데이터베이스 연결과 같은 고비용의 자원을 파티션 단위로 공유해 사용할 수 있다는 장점이 있다.
val rdd1 = sc.parallelize(1 to 10, 3)
val Wrdd2 = rdd1.mapPartitions(numbers => {
print("DB 연결")
numbers.map { number => number + 1 }
})
numbers는 각 파티션에 담긴 요소를 가리키는 것으로 실제로는 각 파티션 내의 요소에 대한 이터레이터(Iterator)이
다. 따라서 mapPartions()에 사용되는 매핑용 함수는 각 파티션 요소에 대한 이터레이터를 전달받아 함수 내부에서
파티션의 개별 요소에 대한 작업을 처리하고 그 결과를 다시 이터레이터 타입으로 되돌려줘야 한다.
o mapPartitionsWithIndex
mapPartitionsWithIndex는 인자로 전달받은 함수를 파티션 단위로 적용하고 그 결과값으로 구성된 새로운 RDD를
생성하는 메소드이다. 유사한 이름의 mapPartitions()와 다른 점은 인자로 전달받은 함수를 호출할 때 파티션에 속한
요소이 정보뿐만아니라 해당 파티션의 인덱스 정보도 함께 전달해 준다.
다음은 파티션 번호(idx) 를 이용해 첫 번째 파티션에서만 결과를 추출한다.
val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = rdd1.mapPartitionsWithIndex((idx, numbers) => {
numbers.flatMap {
case number if idx == 1 => Option(number + 1)
case _ => None
}
})
o mapValues
RDD 요소가 키와 값의 쌍으로 이루고 있는 경우 PairRDD라는 용어를 사용한다.
페어RDD에 속하는 데이터는 키를 기준으로 작은 그룹들을 만들고 해당 그룹들에 속한 값을 대상으로 합계나
평균을 구하는 등의 연산을 수행한다.
mapValues()는 RDD의 모든 요소들이 키와 값의 쌍을 이루고 있는 경우메만 사용 가능한 메소드이며,
인자로 전달받은 함수를 값에 해당하는 요소에만 적용하고 그 결과로 새로운 RDD를 생성한다.
즉 키에 해당하는 부분은 그대로 두고 값에만 map()연산을 적용한 것과 같다.
val rdd = sc.parallelize(List("a", "b", "c")).map((_, 1))
val result = rdd.mapValues(i => i + 1)
println(result.collect.mkString("\t"))
o flatMapValues
flatMapValues는 RDD의 구성요소가 키와 값으로 이루어진 경우에만 사용할 수 있다.
mapValues()와 마찬가지로 키는 그대로 두고 값에 해당하는 요소만을 대상으로 flatMap() 연산을 적용한다.
val rdd = sc.parallelize(Seq((1,"a,b"),(2,"a,c"),(3,"d,c")))
val result = rdd.flatMapValues(_.split(","))
println(result.collect.mkString(" "))
[그룹화 연산]
o zip
두 개의 서로 다른 RDD를 각 요소의 인덱스에 따라 하나의(키,값)으로 묶는다.
첫 번째 RDD의 n번째 요소를 키로 하고 두 번째 RDD의 n번째 요소를 값으로 하는순서쌍을 생성한다.
두 RDD는 파티션의 개수와 각 파티션에 속하는 요소의 수가 동일하다고 가정한다.
val rdd1 = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List(1,2,3))
val result = rdd1.zip(rdd2)
o zipPartitions
zip() 메소드가 두 개의 서로 다른 RDD 요소를 쌍으로 묶어주는 역할만 수행했다면 zipPartitions()는 파티션
단위로 zip() 연산을 수행하고 특정 함수를 적용해 그 결과로 구성된 새로운 RDD를 생성한다.
zip과 zipPartitions의 중요한 차이점은 zipPartitions()은 요소들의 집합 단위로 병합을 실행하므로 파티션의
개수만 동일해도 된다. 두 번째로 zip()은 인자로 최대 1개의 RDD만 지정하지만 zipPartitions()은 최대 4개까지
지정할 수 있다. 마지막으로 zip()은 단지 두 RDD 요소의 쌍을 만들기만 하지만 zipPartitions()은 병합에 사용할
함수를 인자로 전달받아 사용할 수 있다.
val rdd1 = sc.parallelize(List("a","b","c"),3)
val rdd2 = sc.parallelize(List(1,2,3),3)
val result = rdd1.zipPartitions(rdd2){
(it1, it2) => for{
v1 <- it1
v2 <- it2
} yield v1+v2
}
o groupBy
RDD의 요소를 일정한 기준에 따라 여러 개의 그룹으로 나누고 이 그룹으로 구성된 새로운 RDD를 생성한다.
각 그룹은 키와 그 키에 속한 요소의 시퀀스로 구성되며, 메소드의 인자로 전달하는 함수가 각 그룹의 키를
결정하는 역할을 담당한다.
val rdd = sc. parallelize(1 to 100)
val result = rdd.groupBy{
case i:Int if(i % 2==0) => "even"
case _ => "odd"
}
result.collect.foreach{
v => println(s"${v._1}, [$v._2.mkString(",")]")
}
result.foreach{ v => println(s"${v._2}")}
o groupBykey
groupByKey()는 키를 기준으로 같은 키를 가진 요소들로 그룹을 만들고 이 그룹들로 구성된 새로운 RDD를 생성한다.
이때 각 그룹은 groupBy()와 같이 키와 그 키에 속한 요소의 시퀀스로 구성된다.
groupBy() 메소드가 요소의 키를 생성하는 작업과 그룹으로 분류하는 작업을 동시에 수행한다면 groupByKey()는
이미 RDD의 구성요소가 키와 값의 쌍으로 이뤄진 경우에 사용 가능한 메소드이다.
val rdd = sc.parallelize(List("a","b","c","b","c")).map((_,1))
val result = rdd.groupByKey
result.collect.foreach {
v => println(s"${v._1}, [${v._2.mkString(",")}]")
}
o cogroup
RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에만 사용할 수 있는 메소드이다.
여러 RDD에 같은 키를 갖는 값 요소를 찾아서 키와 그 키에 속하는 요소의 시퀀스로 구성된 튜플을 만들고,
그 튜플들로 구성된 새로운 RDD를 생성한다.
val rdd1 = sc.parallelize(List(("k1","v1"),("k2","v2"),("k1","v3")))
val rdd2 = sc.parallelize(List(("k1","v4"),("k2","v3")))
val result = rdd1.cogroup(rdd2)
result.collect.foreach {
case (k, (v_1, v_2)) =>
println(s"($k, [${v_1.mkString(",")}], [${v_2.mkString(",")}])")
}
cogroup의 실행결과 생성된 result RDD는 rdd1과 rdd2의 전체 키의 개수와 동일한 두 개의 요소를 가진다.
result의 각 요소는 튜플로 구성되는데 튜플의 첫번째 요소는 키를 의미하고 두 번째 요소는 그 키에 속하는
값들의 시퀀스를 요소로 하는 또 다른 튜플로 구성된다.
이때 rdd1 요소 중에서 해당 키에 속하는 값들의 시퀀스가 첫 번째, rdd2의 요소의 값들은 두 번째에 위치한다.
[집합 연산]
o distinct
RDD 요소 중에서 중복을 제외한 요소로만 구성된 새로운 RDD를 생성한다.
val result = rdd.distinct()
println(result.collect.mkString(","))
o cartesian
두 RDD 요소의 카테시안곱을 구하고 그 결과를 요소로 하는 새로운 RDD를 생성한다.
val rdd1 = sc.parallelize(List((1,2,3)
val rdd2 = sc.parallelize(List("a","b","c"))
val result = rdd1.cartesian(rdd2)
o substract
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1에는 속하는 rdd2에는 속하지 않는 요소로 구성된
새로운 RDD를 생성한다.
val rdd1 = sc.parallelize(List("a","b","c","d","e"))
val rdd2 = sc.parallelize(List("d","e"))
val result = rdd1.substract(rdd2)
o union
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1 또는 rdd2에 속하는 요소로 구성된 새로운 RDD를 생성한다.
val rdd1 = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List("d","e"))
val result = rdd1.union(rdd2)
o intersection
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1과 rdd2에 동사에 속하는 요소로 구성된 새로운 RDD를 생성한다.
intersection()의 결과로 생성된 RDD에는 중복된 원소가 존재하지 않는다.
val rdd1 = sc.parallelize(List("a","a","b","c"))
val rdd2 = sc.parallelize(List("a","c","c","e"))
val result = rdd1.intersection(rdd2)
o join
RDD의 요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
두 RDD에서 서로 같은 키를 가지고 있는 요소를 모아서 그룹을 형성하고, 이 결과로 구성된 새로운 RDD를 생성한다.
데이터베이스의 조인과 같은 유사한 동작을 수행한다.
val rdd1 = sc.parallelize(List("a","b","c","d","e")).map(_, 1)
val rdd2 = sc.parallelize(List("b","c")).map(_, 2)
val result = rdd1.join(rdd2)
o leftOuterJoin, rightOuterJoin
RDD의 구성 요소가 키와 값으로 구성된 경우 사용할 수 있다.
rdd1과 rdd2라는 두 개의 RDD가 있을 때 두 메소드는 이름 그대로 왼쪽 외부 조인과 오른쪽 외부 조인을 수행하고
그 결과로 구성된 새로운 RDD를 생성한다.
val rdd1 = sc.parallelize(List("a","b","c")).map(_, 1)
val rdd2 = sc.parallelize(List("b","c")).map(_, 2)
val result1 = rdd1.leftOuterJoin(rdd2)
val result2 = rdd1.rightOuterJoin(rdd2)
o substractByKey
RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
rdd1과 rdd2라는 두 RDD가 있을 때 rdd1.substractByKey(rdd2)는 rdd1의 요소 중에서 rdd2에 같은 키가 존재하는
요소를 제외한 나머지로 구성된 새로운 RDD를 생성한다.
val rdd1 = sc.parallelize(List("a","b","c")).map(_, 1)
val rdd2 = sc.parallelize(List("b")).map(_, 2)
val result1 = rdd1.substractByKey(rdd2)
[집계 연산]
o reduceBykey
reduceByKey()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드로서 같은 키를 가진
값들을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDD를 생성한다.
병합을 수행하기 위해 두 개의 값을 하나로 합치는 함수를 인자로 전달받는데, 이때 함수가 수행하는 연산은
결합법칙과 교환법칙이 성립됨을 보장해야 한다. 왜냐하면 데이터가 여러 파티션에 분산돼 있어서 항상 같은 순서로
연산이 수행됨을 보장할 수 없기 때문이다.
- 두 개의 정수를 더하는 함수를 병합 함수로 사용
val rdd = sc.parallelize(List("a","b","c")).map(_, 1)
val result = rdd.reduceByKey(_ + _)
o foldbyKey
RDD 구성요소가 키와 값으로 구성된 경우에 사용할 수 있는 메소드이다. 전반적인 동작은 reduceBykey() 메서드와
유사해서 같은 키를 가진 값을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDDㄹ르 생성한다. 하지만 reduceBykey와는
달리 병합 연산의 초기값을 메소드의 인자로 전달해서 병합시 사용할 수 있다는 점에서 차이가 있다.
교환법칙을 만족하지 않더라고 해당 연산에 대한 결합법칙만 만족한다면 사용할 수 있다. 단 이때 초기값은 여러 번
반복해도 연산 결과에는 영향을 주지 않는 값이어야 한다.
- 초기값 0을 인자로 두 개의 정수를 더하는 함수를 병합 함수로 사용
val rdd = sc.parallelize(List("a","b","c")).map(_, 1)
val result = rdd.foldByKey(0)(_ + _)
o combineByKey
combineBykey()는 RDD의 구성요고가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
reduceBykey()나 foldByKey()와 유사하게 같은 키를 가진 값들을 하나로 병합하는 기능을 수행하지만 수행하는 과정에
서 값의 타입이 바뀔 수 있다는 점에서 두 메소드와 차이가 있다.
def reduceByKey(func: (V,V) => V): RDD[(K,V)]
def foldByKey(zeroValue: V)(func: (V,V) => V): RDD[(K,V)]
def combineByKey(createCombiner: (V) => C, mergeConbiners: (C,C) => C): RDD[(K,C)]
reduceBykey와 foldByKey는 RDD[K,V], 즉 키의 타입이 K이고 값의 타입이 V인 FairRDDFunctions에 정의된 메소드인데 병합에 사용되는 함수와 최종 결과로 만들어진 RDD가 모두 원래의 RDD와 같은 키의 타입은 K, 값의 타입은 V를 유지하고 있다. 이에 반해 combineByKey는 최종 결과 RDD가 C 타입의 값을 가진 RDD[K,C]로 바뀌어 있다.
- createCombiner() : 값을 병합하기 위한 컴바이너를 생성한다. 컴바이너는 두 개의 값을 하나로 병합하는 객체이다.
컴바이너는 각 키별로 병합을 수행하고 그 결과를 내부에 저장한다. 만약 특정 키에 대해 정의된 컴바이너가 없다면
이함수를 이용해 컴바이너를 생성한다.
- mergeValue() : 키에 대한 컴바이너가 이미 존재한다면 새로운 컴바이너를 만들지 않고 이 함수를 이용해 값을 기존
컴바이너에 병합시킨다. 각 키별 컴바이너 내부에는 병합된 값들이 누적되어 쌓이게 된다.
- mergeCombiners() : createCombiner()와 mergeValue()는 파티션 단위로 수행된다. mergeCombiner()는 병합이 끝난
컴바이너들 끼리 다시 병합을 수행해 최종 컴바이너를 생성한다. 파티션 단위로 키별 컴바이너를 생성해서 병합을
수행하고 이렇게 생성된 컴바이너를 모두 벙합해 최종 결과를 생성한다.
- 평균 구하기
case class Record(var amount: Long, var number: Long=1){
def map(v: Long) = Record(v)
def add(amount: Long): Record = {
add(map(amount))
}
def add(other:Record): Record = {
this.number += other.number
this.amount += other.amount
}
override def toString: String = s"avg:${amount/number}"
}
val data = Seq("Math",100L),("Eng",80L),("Math",50L),("Eng",60L),("Eng",90L))
val rdd = sc.parallelize(data)
val createCombiner = (v:Long) => Record(v)
val mergeValue = (c:Record, v:Long) => c.add(v)
val mergeCombiners = (c1:Record, c2:Record) => c1.add(c2)
val result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiner)
o aggregateBykey
aggregateBykey()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
combineByKey()의 특수한 경우로서 병합을 시작할 초깃값을 생성하는 부분을 제외하면 combineByKey()와
동일한 동작을 수행한다.
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C,V) => C, mergeCombiners: (C,C) => C): RDD[(K,C)]
def aggreateBykey[U](zeroValue:U)(seqOp: (U,V) => U, comOp: (U,U) => U): RDD[(K,C)]
mergeValue와 seqOP, mergeCombiners와 combOp는 서로 같은 역할ㅇ르 하는 함수이다. 차이점이 있다면
combineByKey메소드에서는 병합을 위한 초깃값을 알기 위해 createCombiner라는 함수를 사용하고
aggregateByKey는 zeroValue라는 값을 사용한다. 따라서 aggregateByKey는 combineByKey의 createCombiner함수의
특정 값 zero를 돌려주는 함수를 사용한 경우이다.
val data = Seq("Math",100L),("Eng",80L),("Math",50L),("Eng",60L),("Eng",90L))
val rdd = sc.parallelize(data)
val zero = Record(0,0)
val mergeValue = (c: Record, v:Long) => c.add(v)
val mergeCombiners = (c1: Record, c2:Record) => c1.add(c2)
val result = rdd.aggregateByKey(zeroad)(mergeValue, mergeCombiners)
[Pipe 및 파티션 연산]
o pipe
pipe를 이용하면 데이터를 처리하는 과정에서 외부 프로세스를 활용할 수 있다.
다음은 세 개의 숫자로 구성된 문자열을 리눅스 cut유틸리티를 이용해 분리한 뒤 첫 번째와 세 번째 숫자를 뽑아낸다.
val rdd = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
val result = rdd.pipe("cut -f 1,3, -d ,")
o coalesce, repatition
RDD를 생성한 뒤 filter() 연산을 비롯한 다양한 트랜스포메이션 연산을 수행하다 보면 최최에 설정된 파티션 개수가
적합하지 않은 경우가 발생할 수 있다. 이 경우 coalesce()나 repartition() 연산을 사용해 현재의 RDD의 파티션 개수를
조정할 수 있다.
두 메소드는 모두 파티션의 크기를 나타내는 정수를 인자로 받아서 파티션의 수를 조정할 수 있지만 처리 방식에 따른 성능의 차이가 있다. repartition이 파티션 수를 늘리거나 줄이는 것을 모두 할 수 있는 반면 coalesce는 줄이는 것만 가능하다.
repartition은 셔플을 기반으로 동작을 수행하는 반면 coalesce는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한
셔플을 사용하지 않기 때문에 성능이 좋다.
val rdd1 = sc.parallelize(1 to 1000000, 10)
val rdd2 = rdd1.coalesce(5)
val rdd3 = rdd2.repartition(10)
println(s"partiton size : ${rdd2.getNumPartitions}")
println(s"partiton size : ${rdd3.getNumPartitions}")
o repatitionAndSortWithinPartitions
파티션을 조정해야 하는 경우는 파티션 크기 외에도 다양하게 발생할 수 있다. 같은 성격을 지닌 데이터를 같은 파티션으로 분리하고 싶을 때이다.
하둡 맵리듀스 프로그램의 경우 동일한 성격을 가진 데이터를 하나의 리듀서에서 처리하기 위해 매퍼 출력 데이터의
키와 파티션을 담당하는 파티셔너(Partitioner), 그리고 소팅을 담당하는 컴퍼레이터(Comparator) 모듈을 조합해서
프로그램을 작성하는 패턴을 오래전부터 사용해 오고 있다.
repatitionAndSortWithinPartitions는 RDD를 구성하는 모든 데이터를 특정 기준에 따라 여러 개의
파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 이 결과로 새로운 RDD를 생성해 주는 메소드이다.
이 메소드를 사용하려면 우선 데이터가 키와 값 쌍으로 구성돼 있어어 하고 메소드를 실행 할 때 각 데이터가 어떤
파티션에 속할지 결정하기 위한 파티셔너(Partitioner)를 설정해야 한다.
파티셔너는 각 데이터의 키 값을 이용해 데이터가 속할 파티션을 결정하게 되는데, 이때 키 값을 이용한 정렬도 함께
수행된다. 즉 파티션 재할당을 위해 셔플을 수행하는 단계에서 정렬도 함께 다루게 되어 파티션과 정렬을 각각 따로
하는 것에 비해 더 높은 성능을 발휘할 수 있다.
val r = scala.util.Random
val data = for(i <- 1 to 10) yield (r.nextInt(100), "-")
val rdd1 = sc.parallelize(data)
val rdd2 = rdd1.repatitionAndSortWithinPartitions(new HashPartitioner(0))
rdd2.foreachPartiton(it => {println("=====";it.foreach(v => println(v)) })
rdd가 준비되면 파티셔너로 HashPartitioner를 설정하고 크기를 3으로 지정한다. 따라서 모든 데이터는 3으로 나눈
나머지 값에 따라 총 3개의 파티션으로 분리된다.
foreachPartiton메소드는 RDD의 파티션 단위로 특정 함수를 실행해 주는 메소드이다.
o partitionBy
partitionBy()는 RDD가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
사용할 때 Partitoner 클래스의 인스턴스를 인자로 전달해야 한다.
Partitioner는 각 요소의 키를 특정 파티션에 할당하는 역할을 수행하는데 스파크에서 기본적으로 제공하는 것으로 HashPartitioner와 RangePartitioner의 두 종류가 있다. 만약 RDD의 파티션 생성 기준을 변경하고 싶다면 직접
Partitioner 클래스를 상속하고 커스트마이징한 뒤 partitonBy() 메소드의 인자로 전달해서 사용하면 된다.
val rdd1 = sc.parallelize(List("apple","mouse","monitor"),5).map{a => (a, a.length) }
val rdd2 = rdd1.partitionBy(new HashPartitioner(3))
println(s"rdd1:${rdd1.getNumPartitons}, rdd2:${rdd2.getNumPartitons}")
[필터와 정렬 연산]
o filter
filter()는 단어의 뜻 그대로 RDD의 요소 중에서 원하는 요소만 남기고 원하지 않는 요소는 걸러내는
동작을 하는 메소드이다. 동작 방식은 RDD의 어떤 요소가 원하는 조건에 부합하는지 여부를 참 or 거짓으로
가려내는 함수를 RDD의 각 요소에 적용해 그 결과가 참인 것은 남기고 거짓인 것은 버린다.
val rdd = sc.parallelize(1 to 5)
val result = rdd.filter(_ > 2)
filter 연산은 처음 RDD를 만들고 나서 다른 처리를 수행하기 전에 불필요한 요소를 사전에 제거하는
목적으로 많이 사용된다. 필터링 연산 후에 파티션 크기를 변경하고자 한다면 이전에 살펴본 coalesce()
메소드를 사용해 RDD 파티션 수를 맞춰 조정할 수 있다.
o sortBykey
sortBykey는 키 값을 기준으로 요소를 정렬하는 연산이다. 키 값을 기준으로 정렬하기 때문에 단연히
모든 요소가 키와 값 형태로 구성되어 있어야 한다.
val rdd = sc.parallelize(List("q","z","a"))
val result = rdd.map((_,1)).sortByKey()
o keys, values
keys()와 values()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
두 메소드의 역할은 이름만 보고도 짐잓할 수 있는데, keys()는 키에 해당하는 요소로 구성된 RDD를
생성하고 values()는 값에 해당하는 요소로 구성된 RDD를 돌려준다.
val rdd = sc.parallelize(List(("k1","v1"),("k2","v2"),("k3","v3"))
println(rdd.keys.collect.mkString(","))
println(rdd.values.collect.mkString(","))
o sample
sample() 메소드를 이용하면 샘플을 추출해 새로운 RDD를 생성할 수 있다.
sample(withReplacement: Boolean, fraction: Double, seed:Long = Utils.random.nextLong): RDD[U]
withReplacement는 복원 추출을 수행할지 여부를 결정하는 것으로 true로 설정되면 복원추출을 수행한다.
fraction은 복원 추출과 비복원 추출일 경우 의미가 달라진다. 복원 추출일 경우 샘플 내에서 각 요소가
나타나는 횟수에 대한 기대값, 즉 각 요소의 평균 발생 횟수를 의미하며, 반드시 0 이상의 값을 지정해야 한다.
비복원 추출일 경우에는 각 요소가 샘플에 포함될 확률을 의미하며 0과 1사이의 값으로 지정할 수 있다.
seed는 일반적인 무작위 값 추출 시 사용하는것과 유사한 개념으로 반복 시행시 결과가 바뀌지 않고 일정한
값이 나오도록 제어하는 목적으로 사용할 수 있다.
sample메소드가 샘플의 크기를 정해놓고 추출을 실행하는 것인 아니다. fraction 0.5를 지정한다고 해서
전체 크기의 절반에 해당하는 샘플이 추출되는 것으로 기대해서는 안된다.
정확한 크기의 샘플을 추출하고자 한다면 takeSample 메소드를 사용해야 한다.
val rdd = sc.parallelize(1 to 100)
val result1 = rdd.sample(false, 0.5)
val result2 = rdd.sample(true, 1.5)
[참고] 위키북스 스파크2 프로그래밍 을 정리한 내용임