RDD는 장점은 분산 환경에서 메모리 기반으로 빠르고 안정적으로 동작하는 프로그램을 작성할 수  있다는 점이다. 하지만 성능 못지 않게 중요한 것이 바로 RDD가 제공하는 풍부한 처리 연산이라고 할 수 있다.
하지만 RDD의 아쉬운 점은 바로 데이터 값 자체는 표현이 가능하지만 데이터에 대한 메타 데이터, 소위 스키마에
대해서는 표현할 방법이 따로 없다는 점이다. 
스파크 SQL은 RDD의 이 같은 단점을 보완할 수 있도록 또 다른 유형의 데이터 모델과 API를 제공하는 스파크 모듈이다.

o 데이터셋
데이터셋은 스파크 1.6 버전에서 처음 소개된 데이터 모델이자 API이다. RDD와 마찬가지로 분산 오브젝트 컬렉션에 대한 프로그래밍 모델이며, 크랜스포메이션과 액션 연산을 포함하고 있다.

데이터셋 이전에는 데이터프레임이라는 API를 사용했는데 이것의 가장 큰 특징은 기존 RDD와는 전혀 다른 형태를 가진
SQL과 유사한 방식의 연산을 제공했다는 점이다. 숫자만으로 구성된 RDD의 모든 요소에 1을 더하기 위해 rdd.map(v => v+1)과
같은 map()연산을 상요한 것에 반해 동일한 요소로 구성된 데이터프레임에서는 df("col")+1과 같은 방법을 사용했다.
이러함 데이터 프로엠은 RDD에 비해 풍부한 API와 옵티마이저를 기반으로 높은 성능으로 복잡한 데이터 처리를 더욱 수월하게
수행할 수 있다는 장점이 있었지만 처리해야하는 작업의 특성에 따라서는 직접 프로그래밍이 가능한 RDD를 사용하는 것에 비해
복잡한 코드를 작성해야 하거나 컴파일 타입 오류 체크 기능을 사용할 수 없다는 단점이 있었다.

스파크2.0부터 데이터프레임이 데이터셋과 통합되면서 데이터셋은 RDD와 데이터프로엠이 제공하던 대부분의 기능을 지원할 수 있게 되었다. 즉 이제는 데이터셋 하나만 사용해 RDD에서 제공하던 형태의 연산도 사용할 수 있고, 기존 데이터프레밍과 같은 SQL기반의 연산도 사용할 수 있게 된 것이다.

연산의 종류와 주요 API
데이터셋에서 제공하는 연산은 RDD와 마찬가지로 액션과 트랜션포메이션이라는 두 종류로 분류할 수 있다.
RDD와 같이 데이터셋을 생성하는 연산은 트랜스포메이션 연산으로, 실제 데이터 처리를 수행하고 결과를 생성하는 연산은 액션 연산으로 분류된다.

데이터셋의 트랜스포메이션은 데이터 타입을 처리하는 방법에 따라 타입연산(typed operations)과 비타입연산(untyped operations)으로 나눌 수 있다. 

val data = 1 to 100 toList
val df = data.toDS
val result = df.map(_+1)
ds.select(col("value")+1)

map(_ + 1)은 데이터의 타입인 정수(Int)로 처리하는 반면 col("value")+1은 마치 데이터베이스의 테이블과 유사하게 처리한 것으로, ds라는 테이블에서 value 컬럼에 1을 더한 결과를 조회한 것과 유사하다. 이때 coㅣ("value")라는 부분은 바로 컬럼을 나타내는 부분으로 여기서 중요한 것은 그 타입이 원래 데이터 타압인 정수(Int)가 아닌 org.apache.spark.sql.Column 타입이라는 점이다.

실제로 데이터셋에는 SQL과 유사한 방식의 데이터 처리를 위해 데이터베이스의 로우(Row)와 칼럼(Column)에 해당하는 타입을 정의하고 있으며, 실제 데이터가 어떤 타입이든지 로우와 컬럼 타입으로 감싸서 처리할 수 있는 구조를 띠고 있다.

비타입 연산이라 데이터를 처리할 때 데이터 본래의 타입이 아닌 org.apache.spark,sql.Row와 org.apache.spark.sql.Column 타입의 객체로 감싸서 처리하는 연산이라고 할 수 있다.
스파크2.0부터는 데이터프레임을 가리켜 Row 타입 데이터셋 이라는 용어를 사용하기도 하는데 그 이유는 데이터프레임이라는 용어가 곧 org.apache.spark.sql.Row 타입의 요소를 포함하고 있는 데이터셋을 가리키는 것이기 때문이다. 


데이터셋과 데이터프레임의 주요 프로그래밍 구성 요소
- 스파크세션(SparkSession)
- 데이터셋(Dataset)
- 데이터프레임(DataFrame)
- DataFrameReader
- DataFrameWriter
- Row, Column
- functions
- StructType, StructField
- GroupedData, GroupedDataSet


o 스파크세션(SparkSession)
스파크세션은 데이터프레임(DataFrame) 또는 데이터셋(DataSet)ㅇ르 생성하거나 사용자 정의 함수(UDF)를 등록하기 위한 목적으로 사용되며, 스파크 SQL 프로그램은 가장 먼저 스파크세션을 생성하는 것으로 시작해야 한다.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Sample").master("local[*]").getOrCreate()


o 데이터프레임, 로우, 컬럼
데이터프레임은 스파크SQL에서 사용하는 분산 데이터 모델이다. 스파크2.0 미만 버전에서는 데이터프레임이라는 별도의 클래스를 가리키는 용어였지만 스파크2.0부터 스파크 SQL의 또 다른 데이터 모델인 데이터셋과 통합되면서 org.apache.spark.sql.Row 타입의 요소를 가진 데이터셋을 가리키는 별칭으로 사용되고 있다.

스파크의 데이터프레임은 R의 데이터프레임이나 데이터베이스의 테이블과 비슷한 행(Row)과 열(Column)의 구조를 가지며, 데이터프레임에  포함된 데이터는 SQL문을 사용하거나 데이터프레임이 제공하는 프로그래밍 API를 이용해 처리할 수 있다.

데이터프레임을 사용하는 궁긍적인 목적은 RDD와 같이 분산 데이터를 저장하고 처리하기 위한 것이지만 RDD가 데이터와 값을 다루는데 초점을 맞추고 있다면 데이터프레임은 데이터 값뿐만아니라 데이터에 댛나 스키마 정보까지 함께 다루고자 한다는 점에서 차이가 있다.

외부 데이터소스로부터 데이터프레임 생성
파일이나 데이터베이스 같은 외부 저장소에 저장돼 있는 데이터를 읽어서 데이터프레임을 생성할 때 가장 손쉽게 처리할 수 있는 방법은 스파크세션이 제공하는 read() 메소드를 사용하는 것이다.
read() 메소드는 다양한 유형의 데이터소스로부터 데이터프레임을 생성할 수 있는 DataFrameReader 인스턴스를 생성하는데, 이 DataFrameReader를 이용해 데이터프레임을 생성할 수 있다.

-JSON 
val df = spark.read.format("json").option("allowComments","true").
         load("/examples/src/main/resources/people.json")


기존 RDD 및 로컬 컬렉션으로부터 데이터프레임 생성
스파크SQL은 스키마를 지정하는 두 가지 방법을 제공한다. 첫 번째는 리플렉션 API를 활용해 데이터의 스키마 정보를 자동으로 추론해 내는 방법으로 스키마 정의를 위한 별도의 추가 코드가 필요 없기 때문에 간결한 코드를 작성할 수 있다. 두 번째 방법은 직접 스키마 정보를 코드로 작성해서 지정하는 방법인데, 스키마 추론을 위한 부가적인 연산을 줄이고 스키마 정보를 원하는 대로 커스커마이징해서 사용할 수 있다.

1.리플렉션을 통한 데이터프레임 생성
특별한 스키마 정의를 추가하지 않아도 컬렉션에 포함된 오브젝트의 속성값으로부터 알아서 스키마 정보를 추출하고 데이터프레임을 생성한다.
리플렉션 방식을 사용하는 방법은 언어별로 차이가 있다. 스칼라의 경우 scala.Product의 하위 타입인 케이스클래스(Case Class)나 튜플(Tuple) 등의 컬렉션을 사용할 수 있다.

- List
val row1 = Persion("hayoon",7,"student")
val row2 = Persion("sunwoo",7,"student")
val row3 = Persion("hajoo",5,"kindergartener")
val data = List(row1, row2, row3)
val df = spark.createDataFrame(data)

- RDD
val rdd = sc.parallelize(data)
val df = spark.createDataFrame(rdd)

- Tuple
컬럼명은 _1, _2로 자동 부여
val row1 = ("col_1","col2")
val row2 = ("col_1","col2")
val data = list(row1, row2)
val df = spark.createDataFrame(data)


2.명시적 타입 지정을 통한 데이터프레임 생성
RDD의 모든 데이터가 실제 타입과 무관하게 문자열로만 구성돼 있거나 같은 값이라도 상황에 따라 데이터의
타입을 다르게 상용하고 싶다면 원하는 대로 스키마를 지정할 수 있는 방법이 필요하다.

스파크SQL은 직접 스키마를 지정할 수 있는 방법을 제공한다.
데이터프레임의 스키마 정보는 컬럼을 나타내는 StructFile, 로우를 나타내는 StructType으로 정의한다.
StructField에는 컬럼의 이름과 타입, null 허용 여부를 지정하고, StructType에는 컬럼으로 사용할  StructField의 목록을 지정한다. 이때 StructField의 사용 가능한 타입은 org.apache.spark,sql.types,DataType 추상 클래스의 하위 클래스로 정의돼 있다.

val sf1 = StructField("name",StringType, nullable=true)
val sf2 = StructField("age",IntegerType, nullable=true)
val sf3 = StructField("job",StringType, nullable=true)
val schema = StructType(List(sf1, sf2, sf3))

val rows = sc.parallelize(List(Row("hayoon",7,"student"),Row("sunwoo",7,"student"), Row("hajoo",5,"kindergartener"))
val df = spark.createDataFrame(rows, schema)

 

[참고] 위키북스 "스파크2 프로그래밍" 을 정리한 내용임

'Spark > Spark2Programming' 카테고리의 다른 글

Spark RDD  (0) 2019.11.26
Spark RDD Transformation  (0) 2019.11.11
Spark RDD Action  (0) 2019.11.11

RDD는 스파크가 사용하는 기본 데이터 모델로서 RDD를 잘 이해하고 다루는 것은 스파크 어플리케이션을 작성할 때 가장 중요한 기초라고 할 수 있다.

RDD를 다루기 전 알아둬야할 중요한 사항
- 스파크 클러스터
- 분산 데이터로서의 RDD
- RDD의 불변성
- 파티션
- HDFS
- Job과 Executor
- 드라이버 프로그램
- 트랜스포메이션과 액션
- 지연(lazy) 동작과 최적화
- 함수의 전달

o 데이터 타입에 따른 RDD 연산
RDD는 데이터 처리에 유용한 다양한 연산을 수행할 수 있는 메소드를 제공한다. RDD가 제공하는 메소드는 데이터 타입과 밀접한 관계를 맺고 있다. 따라서 스파크에서 자주 사용되는 특별한 데이터 유형에 대해 좀 더 특화된 메소드를 제공할 수 있도록 클래스를 제공하고 있다. 
- PairRDDFunctions : 키와 값의 형태로 구성된 데이터 연산을 제공하는 클래스
  groupBykey나 reduceByKey, mapValues와 같이 키를 사용하는 유용한 연산을 제공한다.
- OrderedRDDFunctions : 키와 값으로 구성된 데이터를 대상으로 하며, 키에 해당하는 변수 타입이 정렬 가능한 경우

  를 위한 것이다. sortBykey와 같은 연산을 제공한다.
- DoubleRDDFunctions : double 유형의 데이터를 위한 것으로 합을 구하는 sum()이나 평균을 구하는 mean()과 같은 

  연산을 제공한다.
- SequenceFileRDDFunctions : 하둡의 시퀀스 파일을 다루기 위한 연산을 제공한다.

o 스파크컨텍스트 생성
스파크컨텍스트는 스파크 애플리케이션과 클러스터의 연결을 관리하는 객체로서 모든 스프크 애플리케이션은 반드시
스파크컨텍스트를 생성해야 한다. RDD를 비롯한 스파크에서 사용하는 주요 객체는 스파크컨텍스트를 이용해 생성 할 
수 있다.

val conf = new SparkConf().setMaster("local[*]").setAppName("RDDCreateSample")
val sc = new SparkContext(conf)

o RDD 생성
Spark는 두 종류의 RDD 생성 방법을 제공한다.
- 드라이버 프로그램의 컬렉션 객체를 이용
- 데이터베이스와 같은 외부 데이터를 읽어서 새로운 RDD를 생성

val rdd1 = sc.paralleize("a","b","c","d")   // 컬렉션 객체를 이용한 RDD 생성
val rdd2 = sc.parallelize(1 to 1000, 10)  // 파티션 개수 지정 
val rdd3 = sc.textfile("/README.md")    // 외부파일을 이용한 RDD 생성 

RDD가 제공하는 연산은 트랜스포메이션과 액션으로 나눌 수 있다. 두 연산을 구분하는 기준은 연산의 결과가 RDD인지 아닌지에 달려 있다. 트랜스포메이션은 기존 RDD를 이용해 새로운 RDD를 생성하는 연산이다.

o 클러스터 환경의 공유변수
하둡이나 스파크와 같이 클러스터 환경에서 동작하는 애플리케이션은 하나의 잡(Job)을 수행하기 위해 클러스터에
속한 다수의 서버에서 여러 개의 프로세스를 실행하게 되므로 모든 프로세스가 공유할 수 있는 자원을 관리하기란
쉽지 않다.
따라서 이러한 프레임워크들은 다수의 프로세스가 공유할 수 있는 읽기 자원과 쓰기 자원을 설정할 수 있또록 지원하는데
하둡은 분산캐시와 카운터를, 스파크는 브로드캐스드 변수(Broadcast Variables)와 어큐뮬레이터(Accumulators)를 제공한다.

o broadcast 변수
스파크의 잡이 실행되는 동안 클러스터 내의 모든 서버에서 공유할 수 있는 일기전용 자원을 설정할 수 있는 변수
예를 들어 사용자ID목록이 담긴 세트 컬렉션 타입의 데이터를 공유 변수로 설정해 각 서버에서 로그를 처리하며서 현재 처리
하려는 로그가 우리가 찾고 이쓴 사용자의 로그가 맞는지 확인하는 등의 용도로 사용할 수 있다.

먼저 공유 객체를 생성하고, 스파크컨텍스트의 broadcast 메소드의 인자로 등록한다.
이렇게 생성된 브로드캐스트 변수의 value() 메소드를 통해 접근한다.

val broadcastUsers = sc.broadcast(Set("u1","u2"))
val rdd = sc.parallelize(List("u1","u2","u3","u4","u5","u6"),3)
val result = rdd.filter(broadcastUsers.value.contains(_))

o accumulator 변수
브로드캐스트 변수가 읽기 동작을 위한 것이라면 어큐뮬레이터는 쓰기 동작을 위한 것이다. 
예를 들어 다수의 서버로 구성된 클러스터 환경에서 동작하는 프로그램인 경우 오류가 발생했을 때 이 문제가 어느 서버의 어떤
프로세스에서 발생한 것인지 확인하는 것이 쉽지 않다. 따라서 클러스트의 각 서버에서 보내는 에러 정보를 한 곳에 모아서 
볼 수 방법이 있다면 손쉽게 에러 상황을 디버깅할 수 있을 것이다.
어큐뮬레이터는 이처럼 클러스터 내의 모든 서버가 공유하는 쓰기 공간을 제공함으로써 각 서버에서 발생하는 특정 이벤트의 수를
세거나 관찰하고 싶은 정보를 모아 두는 등의 용도로 편리하게 활용할 수 있다.

어큐뮬레이터를 생성하려면 ora.apache.spark.util.AccumulatorV2 클래스를 상속받은 클래스를 정의하여 인스턴스를 생성해야 한다.
그리고 생성한 인스턴스를 스파크 컨텍스트에서 제공하는 register() 메소드를 이용하여 등록한다.

스파크에서는 자주 사용되는 몇 가지 데이터 타입에 대한 어큐뮬레이터를 미리 정의해 뒸으며 이를 편리하게 사용할 수 있게 생성과
등록 작업을 한번에 처리하는 간단한 메소드를 컨텍스트를 통해 제공한다.
- long Accumulator()
- collectionAccumulator()
- doubleAcculator()

val acc1 = sc.longAccumulator("invalidFormat")
val acc2 = sc.collectionAccumulator("invalidFormat2")
val data = List("U1:Addr1","U2:Addr2","U3","U4:Addr4","U5;Addr5","U6:Addr6","U7::Addr7")
sc.parallelize(data,3).foreach{ v =>
  if(v.split(":").length !=2){
    acc1.add(1L)
    acc2.add(v)
  }
}

자체 어큐뮬레이터를 정의하려면 org.apache.spark.util.AccumulatorV2 추상 클래스를 상속받고 isZero(), copy() 등의 필요한
메소드를 정의해야 한다. 이때 AccumulatorV2[Record, Long]에서 각각 입력 데이터 타입과 출력 데이터 타입을 의미한다.
isZero()의 경우 RecordAccumulator에 포함된 Record의 값이 초깃값에 해당하는지 여부를 확인하기 위한 것으로, 초깃값이 맞으면
true, 아니면 false를 돌려준다.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.util.AccumulatorV2

class RecordAccumulator extends AccumulatorV2[Record, Long]{
  private var _record = Record(0)
  def isZero:Boolean = _record.amount == 0 && _record.number==1
  def copy():AccumulatorV2[Record, Long] = {
    val newAcc = new RecordAccumulator
    newAcc._record = Record(_record.amount, _record.number)
    newAcc
  }
  df reset(): Unit = {
    _record.amount = 0L
    _record.number = 1L
  }
  def add(other: Record): Unit = {
    _record.add(other)
  }
  def merge(other: AccumulatorV2[Record, Long]): Unit = other match {
    case o :  RecordAccumulator => _record.add(o._record)
    case _ => throw new RuntimeException    
  }
  def value: Long = {
    _record.amount
  }
}

val acc = new RecordAccumulator
sc.register(acc, "invalidFormat")
val data = List("U1:Addr1","U2:Addr2","U3","U4:Addr4","U5;Addr5","U6:Addr6","U7::Addr7")
sc.parallelize(data,3).foreach{ v =>
  if(v.split(":").length !=2){
    acc.add(Record(1))
  }
}

어큐뮬레이터를 증가시키는 동작은 클러스터의 모든 데이터 처리 프로세스에서 가능하지만 데이터를 읽는 동작은
드라이버 프로그램 내에서만 가능하다. 즉 RDD의 트랜스포메이션이나 액션 연산 내부에서는 어큐뮬레이터의 값을
증가시킬 수만 있을 뿐 그 값을 참조해서 사용하는 것은 불가능하다.
일부러 의도한 특별한 목적이 없는 한 어큐뮬레이터는 액션 연산을 수행하는 메소드에서만 사용해야 한다.
그 이유는 스파크의 트랜스포메이션 연산은 액션 연산과 달리 하나의 잡 내에서 필요에 따라 수차례 반복 실행될
수 있기 때문이다. 따라서 map()이나 flatMap()과 같은 트랜스포메이션 연산 내용에 어큐뮬레이터의 값을 증가
시키는 코드가 포함될 경우 정확하지 않은 데이터가 집계될 수 있다.

[참고] 위키북스 스파크2 프로그래밍 을 정리한 내용임

'Spark > Spark2Programming' 카테고리의 다른 글

Spark SQL  (0) 2019.12.04
Spark RDD Transformation  (0) 2019.11.11
Spark RDD Action  (0) 2019.11.11

트랜스포메이션은 기존 RDD를 이용해 새로운 RDD를 생성하는 연산이다. 
각 요소의 타입을 문자열에서 숫자로 바꾸거나 불필요한 연산을 제외하거나 기존 요소의 값에 특정 값을 더하는 등의

작업이 모두 포함된다. 자주 사용하는 연산은 맵연산, 그룹화 연산, 집합 연산, 파티션 연산, 필터와 정렬 연산 등이다. 

맵(Map) 연산 : 요소간의 매핑을 정의한 함수를 RDD에 속하는 모든 요소에 적용해 새로운 RDD를 생성 
그룹화 연산 : 특정 조건에 따라 요소를 그룹화하거나 특정 함수를 적용 
집합 연산 : RDD에 포함된 요소를 하나의 집합으로 간주할 때 서로 다른 RDD 간에 합집합, 교집합 등을 계산 
파티션 연산 : RDD 파티션 개수를 조정 
필터와 정렬 연산 : 특정 조건을 만족하는 요소만 선택하거나 각 요소를 정해진 기준에 따라 정렬 


[맵 연산]  
o map 
스파크를 이용한 데이터 처리 작업에서 흔히 사용되는 대표적인 연산 중 하나이다. 
map() 메소드가 인자값으로 함수 하나를 전달받아 RDD에 속하는 모든 요소에 적용한 뒤 그 결과값으로 구성된

새로운 RDD를 생성하여 반환한다. 

val rdd_map = rdd.map(_+1) 
print(rdd_map.collect.mkString(" ")) 

 

o flatMap 
flatMap() 메소드는 map() 메소드와 유사하게 동작한다. 
두 메소드 간의 가장 큰 차이점은 인자로 전달되는 함수가 반환하는 값의 타입이 다르다는 점이다. 

map API : map[U](f:(T) => U) : RDD[U] 
flatMap API : flatMap[U](f:(T) => TraversableOnce[U]) : RDD[U] 

눈여겨 봐야할 부분은 두 메소드의  인자로 전달되고 있는 f라는 함수가 반환하는 값의 타입이다. 
f 함수를 보면 map()에서는 U타입의 출력값을 돌려주는 반면 flatMap()에서는 TraversableOnce[U]라는 타입의 
출력값을 돌려주고 있다. 
U라는 문자는 어떤 값이든 가능하다는 뜻으로 map() 메소드의 경우 인자로 사용하는 함수의 반환 타입에 제약이 

없지만 flatMap() 메소드의 경우는 일정한 규칙을 따라야 한다. 
TraversableOnce는 스칼라에서 사용하는 이터레이션(Iteration) 타입 중 하나이다.  따라서 이 선언의 의미는 flatMap에 사용하는 함수 f는 반환값으로 리스트나 시퀀스 같은  여러 개의 값을 담은 일종의 컬렉션과 유사한 타입의 값을 반환해야 한다. 

val fruits = List("apple,orange", "grape,apple,mango", "blueberry,tomato,orange") 
val rdd1 = sc.parallelize(fruits) 
val rdd2 = rdd1.map(_.split(",")) 
println(rdd2.collect().map(_.mkString("{", ", ", "}")).mkString("{", ", ", "}")) 

val rdd2 = rdd1.flatMap(_.split(",")) 
print(rdd2.collect.mkString(", ")) 

map()의 정의 f:(T)=>U 에서 T가 문자열이고 U가 배열이므로 결과값의 타입은 RDD[배열]이다. 
각 배열 속에 포함된 요소를 모두 배열 밖으로 끄집어내는 작업을 위해서는 flatMap()을 사용해야 한다. 

하나의 입력값에 대응하는 반환값이 여러 개 일 때 유용하게 사용할 수 있다. 

val rdd2 = rdd1.flatMap( log => { 
  // apple이라는 단어가 포함된 경우만 처리 
  if(log.contains("apple"){ 
    Some(log.indexOf("apple")) 
  } else{ 
    None 
  } 
}) 

Some과 None은 값이 있거나 없을 수 있는 옵션 상황을 표시하는 스칼라 타입이다. 
flatMap()에서 요구하는 TraversableOnce 타입은 아니지만 내부적인 반환에 의해  Some은 값이 있는 집합, 

None은 값이 없는 집합처럼 취급된다. 따라서 최종 결과로 모든 집합의 값을 하나로 만들면 Some에 포함된

들만 남게 된다. 


o mapPartitions 
map과 flatMap 메소드가 RDD의 각 요소를 하나씩 처리한다면 mapPartitions()는 파티션 단위로 처리한다. 

인자로 전달받은 함수를 파티션 단위로 적용하고, 그 결과로 구성된  새로운 RDD를 생성한다. 
파티션 단위로 파티션에 속한 모든 요소를 한 번의 함수 호출로 처리할 수 있기 때문에 파티션 단위의 중간 

산출물을 만들거나 데이터베이스 연결과 같은 고비용의 자원을 파티션 단위로 공유해 사용할 수 있다는 장점이 있다. 

val rdd1 = sc.parallelize(1 to 10, 3) 
val Wrdd2 = rdd1.mapPartitions(numbers => { 
  print("DB 연결") 
  numbers.map { number => number + 1 } 
}) 

numbers는 각 파티션에 담긴 요소를 가리키는 것으로 실제로는 각 파티션 내의 요소에 대한 이터레이터(Iterator)이

다. 따라서 mapPartions()에 사용되는 매핑용 함수는 각 파티션 요소에 대한 이터레이터를 전달받아 함수 내부에서

파티션의 개별 요소에 대한 작업을 처리하고 그 결과를 다시 이터레이터 타입으로 되돌려줘야 한다. 

 


o mapPartitionsWithIndex 
mapPartitionsWithIndex는 인자로 전달받은 함수를 파티션 단위로 적용하고 그 결과값으로 구성된 새로운 RDD를 

생성하는 메소드이다. 유사한 이름의 mapPartitions()와 다른 점은 인자로 전달받은 함수를 호출할 때 파티션에 속한

요소이 정보뿐만아니라 해당 파티션의 인덱스 정보도 함께 전달해 준다. 

다음은 파티션 번호(idx) 를 이용해 첫 번째 파티션에서만 결과를 추출한다. 
val rdd1 = sc.parallelize(1 to 10, 3) 
val rdd2 = rdd1.mapPartitionsWithIndex((idx, numbers) => { 
  numbers.flatMap { 
    case number if idx == 1 => Option(number + 1) 
    case _                  => None 
  } 
}) 

 

o mapValues 
RDD 요소가 키와 값의 쌍으로 이루고 있는 경우 PairRDD라는 용어를 사용한다. 
페어RDD에 속하는 데이터는 키를 기준으로 작은 그룹들을 만들고 해당 그룹들에 속한 값을  대상으로 합계나

평균을 구하는 등의 연산을 수행한다. 
mapValues()는 RDD의 모든 요소들이 키와 값의 쌍을 이루고 있는 경우메만 사용 가능한 메소드이며, 
인자로 전달받은 함수를 값에 해당하는 요소에만 적용하고 그 결과로 새로운 RDD를 생성한다. 
즉 키에 해당하는 부분은 그대로 두고 값에만 map()연산을 적용한 것과 같다. 

val rdd = sc.parallelize(List("a", "b", "c")).map((_, 1)) 
val result = rdd.mapValues(i => i + 1) 
println(result.collect.mkString("\t")) 
    


o flatMapValues 
flatMapValues는 RDD의 구성요소가 키와 값으로 이루어진 경우에만 사용할 수 있다. 
mapValues()와 마찬가지로 키는 그대로 두고 값에 해당하는 요소만을 대상으로 flatMap() 연산을 적용한다. 

val rdd = sc.parallelize(Seq((1,"a,b"),(2,"a,c"),(3,"d,c"))) 
val result = rdd.flatMapValues(_.split(",")) 
println(result.collect.mkString(" ")) 

 

[그룹화 연산]
o zip
두 개의 서로 다른 RDD를 각 요소의 인덱스에 따라 하나의(키,값)으로 묶는다.
첫 번째 RDD의 n번째 요소를 키로 하고 두 번째 RDD의 n번째 요소를 값으로 하는순서쌍을 생성한다.
두 RDD는 파티션의 개수와 각 파티션에 속하는 요소의 수가 동일하다고 가정한다.

val rdd1 = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List(1,2,3))
val result = rdd1.zip(rdd2)

 


o zipPartitions
zip() 메소드가 두 개의 서로 다른 RDD 요소를 쌍으로 묶어주는 역할만 수행했다면 zipPartitions()는 파티션

단위로 zip() 연산을 수행하고 특정 함수를 적용해 그 결과로 구성된 새로운 RDD를 생성한다.
zip과 zipPartitions의 중요한 차이점은 zipPartitions()은 요소들의 집합 단위로 병합을 실행하므로 파티션의

개수만 동일해도 된다. 두 번째로 zip()은 인자로 최대 1개의  RDD만 지정하지만 zipPartitions()은 최대 4개까지

지정할 수 있다. 마지막으로 zip()은 단지 두 RDD 요소의 쌍을 만들기만 하지만 zipPartitions()은 병합에 사용할

함수를  인자로 전달받아 사용할 수 있다.

val rdd1 = sc.parallelize(List("a","b","c"),3)
val rdd2 = sc.parallelize(List(1,2,3),3)
val result = rdd1.zipPartitions(rdd2){
  (it1, it2) => for{
  v1 <- it1
  v2 <- it2
  } yield v1+v2
}

 


o groupBy
RDD의 요소를 일정한 기준에 따라 여러 개의 그룹으로 나누고 이 그룹으로 구성된  새로운 RDD를 생성한다. 

각 그룹은 키와 그 키에 속한 요소의 시퀀스로 구성되며,  메소드의 인자로 전달하는 함수가 각 그룹의 키를 

결정하는 역할을 담당한다.

val rdd = sc. parallelize(1 to 100)
val result = rdd.groupBy{
  case i:Int if(i % 2==0) => "even"
  case _ => "odd"
}
result.collect.foreach{
  v => println(s"${v._1}, [$v._2.mkString(",")]")
}
result.foreach{ v => println(s"${v._2}")}

 


o groupBykey
groupByKey()는 키를 기준으로 같은 키를 가진 요소들로 그룹을 만들고 이 그룹들로 구성된 새로운 RDD를 생성한다.

이때 각 그룹은 groupBy()와 같이 키와 그 키에 속한  요소의 시퀀스로 구성된다.
groupBy() 메소드가 요소의 키를 생성하는 작업과 그룹으로 분류하는 작업을 동시에 수행한다면  groupByKey()는

이미 RDD의 구성요소가 키와 값의 쌍으로 이뤄진 경우에 사용 가능한 메소드이다.

val rdd = sc.parallelize(List("a","b","c","b","c")).map((_,1))
val result = rdd.groupByKey
result.collect.foreach { 
  v => println(s"${v._1}, [${v._2.mkString(",")}]")  
}
 

 

o cogroup
RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에만 사용할 수 있는 메소드이다.
여러 RDD에 같은 키를 갖는 값 요소를 찾아서 키와 그 키에 속하는 요소의 시퀀스로 구성된 튜플을 만들고, 

그 튜플들로 구성된 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List(("k1","v1"),("k2","v2"),("k1","v3")))
val rdd2 = sc.parallelize(List(("k1","v4"),("k2","v3")))
val result = rdd1.cogroup(rdd2)
result.collect.foreach {
  case (k, (v_1, v_2)) => 
    println(s"($k, [${v_1.mkString(",")}], [${v_2.mkString(",")}])")
}

cogroup의 실행결과 생성된 result RDD는 rdd1과 rdd2의 전체 키의 개수와 동일한 두 개의  요소를 가진다. 

result의 각 요소는 튜플로 구성되는데 튜플의 첫번째 요소는 키를 의미하고 두 번째 요소는 그 키에 속하는 

값들의 시퀀스를 요소로 하는 또 다른 튜플로 구성된다.
이때 rdd1 요소 중에서 해당 키에 속하는 값들의 시퀀스가 첫 번째, rdd2의 요소의 값들은 두 번째에 위치한다.

 

 

[집합 연산]
o distinct
RDD 요소 중에서 중복을 제외한 요소로만 구성된 새로운 RDD를 생성한다.

val result = rdd.distinct()
println(result.collect.mkString(","))

 


o cartesian
두 RDD 요소의 카테시안곱을 구하고 그 결과를 요소로 하는 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List((1,2,3)
val rdd2 = sc.parallelize(List("a","b","c"))
val result = rdd1.cartesian(rdd2)

 

o substract
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1에는 속하는 rdd2에는 속하지 않는 요소로 구성된
새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List("a","b","c","d","e"))
val rdd2 = sc.parallelize(List("d","e"))
val result = rdd1.substract(rdd2)

 

o union
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1 또는 rdd2에 속하는 요소로 구성된 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List("d","e"))
val result = rdd1.union(rdd2)

 

o intersection
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1과 rdd2에 동사에 속하는 요소로 구성된 새로운 RDD를 생성한다.
intersection()의 결과로 생성된 RDD에는 중복된 원소가 존재하지 않는다.

val rdd1 = sc.parallelize(List("a","a","b","c"))
val rdd2 = sc.parallelize(List("a","c","c","e"))
val result = rdd1.intersection(rdd2)

 

o join
RDD의 요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
두 RDD에서 서로 같은 키를 가지고 있는 요소를 모아서 그룹을 형성하고, 이 결과로 구성된 새로운 RDD를 생성한다. 

데이터베이스의 조인과 같은 유사한 동작을 수행한다.

val rdd1 = sc.parallelize(List("a","b","c","d","e")).map(_, 1)
val rdd2 = sc.parallelize(List("b","c")).map(_, 2)
val result = rdd1.join(rdd2)

 

o leftOuterJoin, rightOuterJoin
RDD의 구성 요소가 키와 값으로 구성된 경우 사용할 수 있다.
rdd1과 rdd2라는 두 개의 RDD가 있을 때 두 메소드는 이름 그대로 왼쪽 외부 조인과 오른쪽 외부 조인을 수행하고
그 결과로 구성된 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List("a","b","c")).map(_, 1)
val rdd2 = sc.parallelize(List("b","c")).map(_, 2)
val result1 = rdd1.leftOuterJoin(rdd2)
val result2 = rdd1.rightOuterJoin(rdd2)

 

o substractByKey
RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
rdd1과 rdd2라는 두 RDD가 있을 때 rdd1.substractByKey(rdd2)는 rdd1의 요소 중에서 rdd2에 같은 키가 존재하는
요소를 제외한 나머지로 구성된 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List("a","b","c")).map(_, 1)
val rdd2 = sc.parallelize(List("b")).map(_, 2)
val result1 = rdd1.substractByKey(rdd2)

 


[집계 연산]
o reduceBykey
reduceByKey()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드로서 같은 키를 가진

값들을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDD를 생성한다.
병합을 수행하기 위해 두 개의 값을 하나로 합치는 함수를 인자로 전달받는데, 이때 함수가 수행하는 연산은 
결합법칙과 교환법칙이 성립됨을 보장해야 한다. 왜냐하면 데이터가 여러 파티션에 분산돼 있어서 항상 같은 순서로

연산이 수행됨을 보장할 수 없기 때문이다.

- 두 개의 정수를 더하는 함수를 병합 함수로 사용
val rdd = sc.parallelize(List("a","b","c")).map(_, 1)
val result = rdd.reduceByKey(_ + _)

 

o foldbyKey
RDD 구성요소가 키와 값으로 구성된 경우에 사용할 수 있는 메소드이다. 전반적인 동작은  reduceBykey() 메서드와

유사해서 같은 키를 가진 값을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDDㄹ르 생성한다. 하지만 reduceBykey와는

달리 병합 연산의 초기값을 메소드의 인자로 전달해서 병합시 사용할 수 있다는 점에서 차이가 있다.
교환법칙을 만족하지 않더라고 해당 연산에 대한 결합법칙만 만족한다면 사용할 수 있다. 단 이때 초기값은 여러 번 

반복해도 연산 결과에는 영향을 주지 않는 값이어야 한다.

- 초기값 0을 인자로 두 개의 정수를 더하는 함수를 병합 함수로 사용
val rdd = sc.parallelize(List("a","b","c")).map(_, 1)
val result = rdd.foldByKey(0)(_ + _)

 

o combineByKey
combineBykey()는 RDD의 구성요고가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
reduceBykey()나 foldByKey()와 유사하게 같은 키를 가진 값들을 하나로 병합하는 기능을 수행하지만 수행하는 과정에

서 값의 타입이 바뀔 수 있다는 점에서 두 메소드와 차이가 있다.

def reduceByKey(func: (V,V) => V): RDD[(K,V)]
def foldByKey(zeroValue: V)(func: (V,V) => V): RDD[(K,V)]
def combineByKey(createCombiner: (V) => C, mergeConbiners: (C,C) => C): RDD[(K,C)]

reduceBykey와 foldByKey는 RDD[K,V], 즉 키의 타입이 K이고 값의 타입이 V인 FairRDDFunctions에 정의된 메소드인데 병합에 사용되는 함수와 최종 결과로 만들어진 RDD가 모두 원래의 RDD와 같은 키의 타입은 K, 값의 타입은 V를 유지하고 있다. 이에 반해 combineByKey는 최종 결과 RDD가 C 타입의 값을 가진 RDD[K,C]로 바뀌어 있다.

- createCombiner() : 값을 병합하기 위한 컴바이너를 생성한다. 컴바이너는 두 개의 값을 하나로 병합하는 객체이다.
  컴바이너는 각 키별로 병합을 수행하고 그 결과를 내부에 저장한다. 만약 특정 키에 대해 정의된 컴바이너가 없다면
  이함수를 이용해 컴바이너를 생성한다.
- mergeValue() : 키에 대한 컴바이너가 이미 존재한다면 새로운 컴바이너를 만들지 않고 이 함수를 이용해 값을 기존 
  컴바이너에 병합시킨다. 각 키별 컴바이너 내부에는 병합된 값들이 누적되어 쌓이게 된다.
- mergeCombiners() : createCombiner()와 mergeValue()는 파티션 단위로 수행된다. mergeCombiner()는 병합이 끝난 
  컴바이너들 끼리 다시 병합을 수행해 최종 컴바이너를 생성한다. 파티션 단위로 키별 컴바이너를 생성해서 병합을 
  수행하고 이렇게 생성된 컴바이너를 모두 벙합해 최종 결과를 생성한다.

- 평균 구하기
case class Record(var amount: Long, var number: Long=1){
  def map(v: Long) = Record(v)
  def add(amount: Long): Record = {
    add(map(amount))
  }
  def add(other:Record): Record = {
    this.number += other.number
    this.amount += other.amount
  }
  override def toString: String = s"avg:${amount/number}"
}

val data = Seq("Math",100L),("Eng",80L),("Math",50L),("Eng",60L),("Eng",90L))
val rdd = sc.parallelize(data)
val createCombiner = (v:Long) => Record(v)
val mergeValue = (c:Record, v:Long) => c.add(v)
val mergeCombiners = (c1:Record, c2:Record) => c1.add(c2)
val result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiner)
 

o aggregateBykey
aggregateBykey()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
combineByKey()의 특수한 경우로서 병합을 시작할 초깃값을 생성하는 부분을 제외하면 combineByKey()와
동일한 동작을 수행한다.

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C,V) => C, mergeCombiners: (C,C) => C): RDD[(K,C)]
def aggreateBykey[U](zeroValue:U)(seqOp: (U,V) => U, comOp: (U,U) => U): RDD[(K,C)]

mergeValue와 seqOP, mergeCombiners와 combOp는 서로 같은 역할ㅇ르 하는 함수이다. 차이점이 있다면
combineByKey메소드에서는 병합을 위한 초깃값을 알기 위해 createCombiner라는 함수를 사용하고
aggregateByKey는 zeroValue라는 값을 사용한다. 따라서 aggregateByKey는 combineByKey의 createCombiner함수의 
특정 값 zero를 돌려주는 함수를 사용한 경우이다.

val data = Seq("Math",100L),("Eng",80L),("Math",50L),("Eng",60L),("Eng",90L))
val rdd = sc.parallelize(data)
val zero = Record(0,0)
val mergeValue = (c: Record, v:Long) => c.add(v)
val mergeCombiners = (c1: Record, c2:Record) => c1.add(c2)
val result = rdd.aggregateByKey(zeroad)(mergeValue, mergeCombiners)


[Pipe 및 파티션 연산]
o pipe
pipe를 이용하면 데이터를 처리하는 과정에서 외부 프로세스를 활용할 수 있다.
다음은 세 개의 숫자로 구성된 문자열을 리눅스 cut유틸리티를 이용해 분리한 뒤 첫 번째와 세 번째 숫자를 뽑아낸다.

val rdd = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
val result = rdd.pipe("cut -f 1,3, -d ,")

 


o coalesce, repatition
RDD를 생성한 뒤 filter() 연산을 비롯한 다양한 트랜스포메이션 연산을 수행하다 보면 최최에 설정된 파티션 개수가 

적합하지 않은 경우가 발생할 수 있다. 이 경우 coalesce()나 repartition() 연산을 사용해 현재의 RDD의 파티션 개수를

조정할 수 있다.

두 메소드는 모두 파티션의 크기를 나타내는 정수를 인자로 받아서 파티션의 수를 조정할 수 있지만 처리 방식에 따른 성능의 차이가 있다. repartition이 파티션 수를 늘리거나 줄이는 것을 모두 할 수 있는 반면 coalesce는 줄이는 것만 가능하다.
repartition은 셔플을 기반으로 동작을 수행하는 반면 coalesce는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한

셔플을 사용하지 않기 때문에 성능이 좋다.

val rdd1 = sc.parallelize(1 to 1000000, 10)
val rdd2 = rdd1.coalesce(5)
val rdd3 = rdd2.repartition(10)
println(s"partiton size : ${rdd2.getNumPartitions}")
println(s"partiton size : ${rdd3.getNumPartitions}")

 

o repatitionAndSortWithinPartitions
파티션을 조정해야 하는 경우는 파티션 크기 외에도 다양하게 발생할 수 있다. 같은 성격을 지닌 데이터를 같은 파티션으로 분리하고 싶을 때이다.
하둡 맵리듀스 프로그램의 경우 동일한 성격을 가진 데이터를 하나의 리듀서에서 처리하기 위해 매퍼 출력 데이터의

키와 파티션을 담당하는 파티셔너(Partitioner), 그리고 소팅을 담당하는 컴퍼레이터(Comparator) 모듈을 조합해서 

프로그램을 작성하는 패턴을 오래전부터 사용해 오고 있다.

repatitionAndSortWithinPartitions는 RDD를 구성하는 모든 데이터를 특정 기준에 따라 여러 개의 
파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 이 결과로 새로운 RDD를 생성해 주는 메소드이다.
이 메소드를 사용하려면 우선 데이터가 키와 값 쌍으로 구성돼 있어어 하고 메소드를 실행 할 때 각 데이터가 어떤 

파티션에 속할지 결정하기 위한 파티셔너(Partitioner)를 설정해야 한다.

파티셔너는 각 데이터의 키 값을 이용해 데이터가 속할 파티션을 결정하게 되는데, 이때 키 값을 이용한 정렬도 함께

수행된다. 즉 파티션 재할당을 위해 셔플을 수행하는 단계에서 정렬도 함께 다루게 되어 파티션과 정렬을 각각 따로

하는 것에 비해 더 높은 성능을 발휘할 수 있다.

val r = scala.util.Random
val data = for(i <- 1 to 10) yield (r.nextInt(100), "-")
val rdd1 = sc.parallelize(data)
val rdd2 = rdd1.repatitionAndSortWithinPartitions(new HashPartitioner(0))
rdd2.foreachPartiton(it => {println("=====";it.foreach(v => println(v)) })

rdd가 준비되면 파티셔너로 HashPartitioner를 설정하고 크기를 3으로 지정한다. 따라서 모든 데이터는 3으로 나눈 

나머지 값에 따라 총 3개의 파티션으로 분리된다.
foreachPartiton메소드는 RDD의 파티션 단위로 특정 함수를 실행해 주는 메소드이다.

 


o partitionBy
partitionBy()는 RDD가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
사용할 때 Partitoner 클래스의 인스턴스를 인자로 전달해야 한다.
Partitioner는 각 요소의 키를 특정 파티션에 할당하는 역할을 수행하는데 스파크에서 기본적으로 제공하는 것으로 HashPartitioner와 RangePartitioner의 두 종류가 있다. 만약 RDD의 파티션 생성 기준을 변경하고 싶다면 직접 

Partitioner 클래스를 상속하고 커스트마이징한 뒤 partitonBy() 메소드의 인자로 전달해서 사용하면 된다.

val rdd1 = sc.parallelize(List("apple","mouse","monitor"),5).map{a => (a, a.length) }
val rdd2 = rdd1.partitionBy(new HashPartitioner(3))
println(s"rdd1:${rdd1.getNumPartitons}, rdd2:${rdd2.getNumPartitons}")


[필터와 정렬 연산]
o filter
filter()는 단어의 뜻 그대로 RDD의 요소 중에서 원하는 요소만 남기고 원하지 않는 요소는 걸러내는 
동작을 하는 메소드이다. 동작 방식은 RDD의 어떤 요소가 원하는 조건에 부합하는지 여부를 참 or 거짓으로
가려내는 함수를 RDD의 각 요소에 적용해 그 결과가 참인 것은 남기고 거짓인 것은 버린다.

val rdd = sc.parallelize(1 to 5)
val result = rdd.filter(_ > 2)

filter 연산은 처음 RDD를 만들고 나서 다른 처리를 수행하기 전에 불필요한 요소를 사전에 제거하는
목적으로 많이 사용된다. 필터링 연산 후에 파티션 크기를 변경하고자 한다면 이전에 살펴본 coalesce() 
메소드를 사용해 RDD 파티션 수를 맞춰 조정할 수 있다.

 

o sortBykey
sortBykey는 키 값을 기준으로 요소를 정렬하는 연산이다. 키 값을 기준으로 정렬하기 때문에 단연히
모든 요소가 키와 값 형태로 구성되어 있어야 한다.

val rdd = sc.parallelize(List("q","z","a"))
val result = rdd.map((_,1)).sortByKey()

 

o keys, values
keys()와 values()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
두 메소드의 역할은 이름만 보고도 짐잓할 수 있는데, keys()는 키에 해당하는 요소로 구성된 RDD를
생성하고 values()는 값에 해당하는 요소로 구성된 RDD를 돌려준다.

val rdd = sc.parallelize(List(("k1","v1"),("k2","v2"),("k3","v3"))
println(rdd.keys.collect.mkString(","))
println(rdd.values.collect.mkString(","))

 

o sample
sample() 메소드를 이용하면 샘플을 추출해 새로운 RDD를 생성할 수 있다.

sample(withReplacement: Boolean, fraction: Double, seed:Long = Utils.random.nextLong): RDD[U]

withReplacement는 복원 추출을 수행할지 여부를 결정하는 것으로 true로 설정되면 복원추출을 수행한다.
fraction은 복원 추출과 비복원 추출일 경우 의미가 달라진다. 복원 추출일 경우 샘플 내에서 각 요소가 
나타나는 횟수에 대한 기대값, 즉 각 요소의 평균 발생 횟수를 의미하며, 반드시 0 이상의 값을 지정해야 한다.
비복원 추출일 경우에는 각 요소가 샘플에 포함될 확률을 의미하며 0과 1사이의 값으로 지정할 수 있다.
seed는 일반적인 무작위 값 추출 시 사용하는것과 유사한 개념으로 반복 시행시 결과가 바뀌지 않고 일정한 

값이 나오도록 제어하는 목적으로 사용할 수 있다.

sample메소드가 샘플의 크기를 정해놓고 추출을 실행하는 것인 아니다. fraction 0.5를 지정한다고 해서
전체 크기의 절반에 해당하는 샘플이 추출되는 것으로 기대해서는 안된다.
정확한 크기의 샘플을 추출하고자 한다면 takeSample 메소드를 사용해야 한다.

val rdd = sc.parallelize(1 to 100)
val result1 = rdd.sample(false, 0.5)
val result2 = rdd.sample(true, 1.5)

 

[참고] 위키북스 스파크2 프로그래밍 을 정리한 내용임

'Spark > Spark2Programming' 카테고리의 다른 글

Spark SQL  (0) 2019.12.04
Spark RDD  (0) 2019.11.26
Spark RDD Action  (0) 2019.11.11

RDD Action

액션은 메소드 중에서 결과값이 정수나 리스트, 맵 등 RDD가 아닌 다른 타입인 것을 통칭해서 부르는 용어이다.
결과값이 RDD인 메소드는 트랜스포메이션이라는 유형으로 분류되는데 모두 느슨한 평가(lazy evaluation) 라는 
방식을 채택하고 있다는 공통점을 가지고 있다.

느슨한 평가 방식을 따른다는 것은 메소드를 호출하는 시점에 바로 실행하는 것이 아니고 계산에 필요한
정보를 누적해서 내포하고 있다가 실제로 계산이 필요한 시점에 실행된다는 것을 의미한다. 이런 방식은
소위 함수형 프로그래밍 과정에서 특히 자주 활용되며, 언어에 따라 조금씩 다른 설계 및 구현 방법이 
알려져 있다.

RDD의 경우 트랜스포메이션에 속하는 메소드는 느슨한 평가 방식을 사용한다. 따라서 호출하는 시점에
실해되는 것이 아니라 액션으로 분류되는 메소드가 호출돼야만 비로소 실행된다.

o first
first 연산은 RDD 요소 가운데 첫 번째 요소를 반환한다. 스파크 쉡에서 작업할 때 트랜스포메이션의 수행 결과
등을 빠르게 확인하는 용도로 활용할 수 있다.

val rdd = sc.parallelize(List(5,4,1))
val result = rdd.first

o take
take()는 RDD의 첫 번째 요소부터 순서대로 n개를 추출해서 되도려주는 메소드이다. 원하는 데이터를 찾기 위해
RDD의 전체 파티션을 다 뒤지지는 않지만 최종 결과 데이터는 배열 혹은 리스트와 같은 컬렉션 타입으로 반환하기 
때문에 지나치게 큰 n값을 지정하면 메모리 부족 오류가 발생할 수 있다.

val rdd = sc.parallelize(1 to 20, 5)
val result = rdd.take(5)

o takeSample
takeSample은 RDD 요소 가운데 지정된 크기의 샘플을 추출하는 메소드이다. sample 메소드와 유사하지만 샘플의
크기를 지정할 수 있다는 점과 결과 타입이 RDD가 아닌 배열이나 리스트 같은 컬렉션 타입이라는 차이점이 있다.

val rdd = sc.parallelize(1 to 100)
rdd.takeSample(false, 10)

o collect
RDD의 모든 원소를 모아서 배열 혹은 리스트 같은 컬렉션을 반환한다.
모든 요소들이 collect 연산을 호출한 서버의 메모리에 수집된다.
따라서 전체 데이터를 담을 수 있을 정도의 충분한 메모리 공간이 있어야 한다.

val rdd = sc.parallelize(1 to 10)
val result = rdd.collect
println(result.mkString(", "))

mkString은 리스트에 담긴 요소를 하나의 문자열로 표현하는 메소드


o count
RDD를 구성하는 전체 요소의 개수를 반환한다.
 
val result = rdd.count
println(result)

o countByValue
countByValue()는 RDD에 속하는 각 값들이 나타나는 횟수를 구해서 맵 형태로 반환한다.

val rdd = sc.parallelize(List(1,1,2,3,3,))
val result = rdd.countByBalue

o reduce
reduce는 RDD에 포함된 임의의 값 두개를 하나로 합치는 함수를 이용해 RDD에 포함된 모든
요소를 하나의 값으로 병합하고 그 결과값을 반환한다. 

def reduce(f: (T,T) => T): T

f라고 표시된 것이 리듀스 메소드의 인자로 전달되는 함수를 의미하는데, T 타입의 변수 두 개를
입력받아 역시 동일한 T 타입의 값을 돌려주는 함수를 사용하고 있다. 이때 T 타입은 RDD에 포함된
요소의 타입을 의미한다.
따라서 RDD[Int] 타입에서는 정수값 두 개를 입력으로 받아 정수값을 반환하는 함수를 사용하고,
RDD[List] 타입의 RDD에서는 리스트 두 개를 입력으로 받아 리스트를 반환하는 함수를 사용해야 한다.

스파크 어플리케이션이 클러스터 환경에서 동작하는 분산 프로그램이기 때문에 실제 병합이 첫 번째 요소부터
마지막 요소까지 순서대로 처리되는 것이 아니고 각 서버에 있는 파티션 단위로 나눠져서 처리된다. 따라서
리뷰스 메소드에 적용하는 병합 연산은 RDD에 포함된 모든 요소에 대해 교환법칙과 결합법칙이 성립되는 경우에만
사용 가능하다.

val rdd = sc.parallelize(1 to 10, 3)
val result = rdd.reduce( _ + _)

o fold
fold는 reduce와 같이 RDD 내이 모든 요소를 대상으로 교환법칙과 결합법칙이 성립되는 바이너리 함수를 
순차 적용해 최종 결과를 구하는 메소드이다. 
fold와 reduce의 차이점은 reduce 연산이 RDD에 포함된 요소만 이용해 병합을 수행하는 데 반해 fold 연산은
벙합 연산의 초기값을 지정해 줄 수 있다

def fold(zeroValue: T)(op: (T,T) => T)

op는 병합에 사용하는 함수를 의미하는 것으로, reduce에서도 동일한 형태로 정의된 함수를 사용하고 있다.
zeroValue에 해당하는 값은 병합이 시작될 때 op 함수의 첫 번째 인자값으로 전달된다.
fold 메소드도 reduce와 마찬가지로 여러 서버에 흩어진 파티션에 대해 병렬로 처리된다. 따라서 fold에 지정된
초기값은 각 파티션별로 부분 병합을 수행할 때마다 사용되기 때문에 여러 번 반복 적용돼도 문제가 없는 값을
사용해야 한다.

val rdd = sc.parallelize(1 to 10, 3)
val result = rdd.fold(0)(_ + _)

reduce 연산의 경우 RDD에 포함된 요소만 병합에 수행하기 때문에 파티션에 속하는 원소가 하나도 없다면
해당 파티션에서는 처리가 수행되지 않지만 fold 연산의 경우 파티션에 속하는 요소가 하나도 없더라도
초기값으로 인해 최소 한번의 연산이 수행된다.

o aggregate
reduce나 fold 메소드의 경우 모두 입력과 출력 타입이 동일해야 한다는 제약이있다.
aggregate 메소드는 타입 제갸 사항이 없기 때문에 입력과 출력의 타입이 다른 경우에도 사용할 수 있다.

메소드의 인자로 3개를 사용하는데 첫 번째는 fold메소드와 유사한 초기값을 지정하는 것이고, 두 번째는
각 파티션 단위 부분합을 구하기 위한 병합함수, 그리고 이렇게 파티션 단위로 생성된 부분합을 최종적으로
하나로 합치기 위한 또 다른 병합함수로 구성된다.

def aggregate[U](zeroValue: U)(seqOp: (U,T) => U, comOp: (U,U) => U)(implicit arg0: ClassTag[U]): U

첫 번째 인자인 zeroValue는 병합의 초기값으로 사용할 값이다. 두 번째 인자인 seqOp는 U와 T타입의 값을
입력값으로 전달받아 U 타입의 값을 돌려주고 있는데 이때 T는 RDD의 요소들이 갖는 타입을 의미하며, U는
zeroValue로 전달했던 초기값과 같은 타입을 의미한다. 예를 들어, RDD[String] 타입의 RDD에서 zeroValue로
비어있는 Set을 사용했다면 seqOp는 (Set,String) => Set과 같은 형태가 돼야 한다.
aggregate는 두 단계에 결처 병합ㅇ르 처리하는데 첫 번째는 파티션 단위로 병합을 수행하고 두 번째 단계에서는
파티션 단위 병합 결과끼리 다시 병합을 수행해서 최종 결과를 생성한다. 이때 파티션 단위 병합에는 첫 번째 인자인
seqOp 함수가 사용되며 두 번재 최종 병합에는 두 번째 인자인 combOp 함수가 사용된다.

val rdd = sc.parallelize(List(100, 80, 75, 90, 95,), 3)
val zeroValue = Record(0,0)
val seqOp = (r:Record, v:Int) => r.add(v)
val comOp = (r1:Record, r2:Record) -> r1 add r2
val result rdd.aggregate(zeroValue)(seqOp, combOp)

val result2 = rdd.aggregate(Record(0,0))(_ add _, _ add _)

o sum
RDD를 구성하는 요소의 타입에 따라 좀 더 특화된 편리한 연산을 제공하기 위해 특정 타입의 요소로 구성된 
RDD에서만 사용 가능한 메소드를 정의하고 있다. 대표적인 경우로 키와 값 형태의 요소를 위한 reduceByKey나
combineBykey 등을 들 수 있다.
sum 메소드도 그중 하나로서, RDD를 구성하는 모든 요소가 double, Long 등 숫자 타입일 경우에만 사용 가능하며
전체 요소의 합을 구한다.

val rdd = sc.parallelize(1 to 10)
val result = rdd.sum

o foreach, foreachPartition
foreach는 RDD의 모든 요소에 특정 함수를 적용하는 메소드이다. 인자로 한개의 입력값을 가지는 함수를 전달받는데
이렇게 전달받은 함수에 각 RDD 요소를 하나씩 입력값으로 사용해 해당 함수를 실행한다.

foreachPartition도 foreach와 같이 실행할 함수를 인자로 전달받아 사용하는데, foreach와 다른 점은 해당 함수를
개별 요소가 아닌 파티션 단위로 적용한다는 점이다. 이와 유사한 메소드로는 mapPartitions, mapPartitionsWithIndex가 
있는데 mapPartitions가 함수의 실행 결과로 새로운 RDD를 되돌려 주는 데 반해 foreachPartitions는 함수를 실행할 뿐
결과값을 돌려주지 않는다는 차이점이 있다.

val rdd = sc.parallelize(1 to 10, 3)
rdd.foreach { v =>
  println(s"Value Side Effect : ${v}")
}
rdd.foreachPartition(values => {
  println("Partition Side Effect!")
  for(v <- values println(s"Value Side Effect : ${v})
)

o toDebugString
toDebugString 메소드는 이름 그대로 디버깅을 위한 메소드이다. RDD의 파티션 개수나 의존성 정보 등 세부정보를 알고
싶을때 사용할 수 있다. 

val rdd = sc.parallelize(1 to 100, 10).map(_ * 2).persist.map(_ + 1).coalesce(2)

println(rdd.toDebugstring)

o cache, persist, unpersist
RDD는 액션 연산이 수행될 때마다 관련 트랜스포메이션 연산을 반복한다. 이때 기존에 사용했던 데이터가 메모리에 남아
있다면 그 데이터를 사용하지만 다른 이유로 인해 데이터가 남아 있지 않다면 RDD 생성 히스토리를 이용해 복구하는
단계를 수행한다. 따라서 다수의 액션 연산을 통해 반복적으로 사용되는 RDD인 경우 데이터를 메모리 등에 저장해 두는 
것이 매번 새로운 RDD를 만들어 내는 것보다 유리하다.

cache와 persist는 첫 액션을 실행한 후 RDD 정보를 메모리 또는 디스크 등에 저장해서 다음 액션을 수행할 때 불필요한
재성성 단계를 거치지 않고 원하는 작업을 즉시 실행할 수 있게 해주는 메소드이다. 이 중에서 cache는 RDD의 데이터를
메모리에 저장하라는 의미로, 만약 메모리 공간이 충분치 않다면 부족한 용량만큼 저장을 수행하지 않게 된다. 이에 반해
persist는 StorageLevel이라는 옵션을 이용해 저장 위치와 저장방식 등을 상세히 지정할 수 있는 기능을 제공한다.
unpersist 메소든느 이미 저장 중인 데이터가 더 이상 필요없을 때 캐시 설정을 취소하는데 사용된다.

val rdd = sc.parallelize(1 to 100, 10)
rdd.cache
rdd.persist(StorageLevel.MEMORY_ONLY)

o partitions
partitins는 RDD의 파티션 정보가 담긴 배열ㅇ르 돌려준다. 이때 배열에 담김 요소는 Partition 타입 객체이며, 파티션의
인덱스 정보를 알려주는 index() 메소드를 포함하고 있다. partitions는 파티션의 크기를 알아보기 위한 용도로 많이
활용하기도 하는데 단순히 크기 정보만 알아볼 목적이라면 getNumPartitions 메소드를 사용하는 것이 더 편리하다.

val rdd = sc.parallelize(1 to 100, 10)
println(rdd.partitinos.size)
println(rdd.getNumPartitions)

[참고] 스파크2 프로그래밍 위키북스

'Spark > Spark2Programming' 카테고리의 다른 글

Spark SQL  (0) 2019.12.04
Spark RDD  (0) 2019.11.26
Spark RDD Transformation  (0) 2019.11.11

+ Recent posts