zeroad 2019. 12. 4. 05:46

RDD는 장점은 분산 환경에서 메모리 기반으로 빠르고 안정적으로 동작하는 프로그램을 작성할 수  있다는 점이다. 하지만 성능 못지 않게 중요한 것이 바로 RDD가 제공하는 풍부한 처리 연산이라고 할 수 있다.
하지만 RDD의 아쉬운 점은 바로 데이터 값 자체는 표현이 가능하지만 데이터에 대한 메타 데이터, 소위 스키마에
대해서는 표현할 방법이 따로 없다는 점이다. 
스파크 SQL은 RDD의 이 같은 단점을 보완할 수 있도록 또 다른 유형의 데이터 모델과 API를 제공하는 스파크 모듈이다.

o 데이터셋
데이터셋은 스파크 1.6 버전에서 처음 소개된 데이터 모델이자 API이다. RDD와 마찬가지로 분산 오브젝트 컬렉션에 대한 프로그래밍 모델이며, 크랜스포메이션과 액션 연산을 포함하고 있다.

데이터셋 이전에는 데이터프레임이라는 API를 사용했는데 이것의 가장 큰 특징은 기존 RDD와는 전혀 다른 형태를 가진
SQL과 유사한 방식의 연산을 제공했다는 점이다. 숫자만으로 구성된 RDD의 모든 요소에 1을 더하기 위해 rdd.map(v => v+1)과
같은 map()연산을 상요한 것에 반해 동일한 요소로 구성된 데이터프레임에서는 df("col")+1과 같은 방법을 사용했다.
이러함 데이터 프로엠은 RDD에 비해 풍부한 API와 옵티마이저를 기반으로 높은 성능으로 복잡한 데이터 처리를 더욱 수월하게
수행할 수 있다는 장점이 있었지만 처리해야하는 작업의 특성에 따라서는 직접 프로그래밍이 가능한 RDD를 사용하는 것에 비해
복잡한 코드를 작성해야 하거나 컴파일 타입 오류 체크 기능을 사용할 수 없다는 단점이 있었다.

스파크2.0부터 데이터프레임이 데이터셋과 통합되면서 데이터셋은 RDD와 데이터프로엠이 제공하던 대부분의 기능을 지원할 수 있게 되었다. 즉 이제는 데이터셋 하나만 사용해 RDD에서 제공하던 형태의 연산도 사용할 수 있고, 기존 데이터프레밍과 같은 SQL기반의 연산도 사용할 수 있게 된 것이다.

연산의 종류와 주요 API
데이터셋에서 제공하는 연산은 RDD와 마찬가지로 액션과 트랜션포메이션이라는 두 종류로 분류할 수 있다.
RDD와 같이 데이터셋을 생성하는 연산은 트랜스포메이션 연산으로, 실제 데이터 처리를 수행하고 결과를 생성하는 연산은 액션 연산으로 분류된다.

데이터셋의 트랜스포메이션은 데이터 타입을 처리하는 방법에 따라 타입연산(typed operations)과 비타입연산(untyped operations)으로 나눌 수 있다. 

val data = 1 to 100 toList
val df = data.toDS
val result = df.map(_+1)
ds.select(col("value")+1)

map(_ + 1)은 데이터의 타입인 정수(Int)로 처리하는 반면 col("value")+1은 마치 데이터베이스의 테이블과 유사하게 처리한 것으로, ds라는 테이블에서 value 컬럼에 1을 더한 결과를 조회한 것과 유사하다. 이때 coㅣ("value")라는 부분은 바로 컬럼을 나타내는 부분으로 여기서 중요한 것은 그 타입이 원래 데이터 타압인 정수(Int)가 아닌 org.apache.spark.sql.Column 타입이라는 점이다.

실제로 데이터셋에는 SQL과 유사한 방식의 데이터 처리를 위해 데이터베이스의 로우(Row)와 칼럼(Column)에 해당하는 타입을 정의하고 있으며, 실제 데이터가 어떤 타입이든지 로우와 컬럼 타입으로 감싸서 처리할 수 있는 구조를 띠고 있다.

비타입 연산이라 데이터를 처리할 때 데이터 본래의 타입이 아닌 org.apache.spark,sql.Row와 org.apache.spark.sql.Column 타입의 객체로 감싸서 처리하는 연산이라고 할 수 있다.
스파크2.0부터는 데이터프레임을 가리켜 Row 타입 데이터셋 이라는 용어를 사용하기도 하는데 그 이유는 데이터프레임이라는 용어가 곧 org.apache.spark.sql.Row 타입의 요소를 포함하고 있는 데이터셋을 가리키는 것이기 때문이다. 


데이터셋과 데이터프레임의 주요 프로그래밍 구성 요소
- 스파크세션(SparkSession)
- 데이터셋(Dataset)
- 데이터프레임(DataFrame)
- DataFrameReader
- DataFrameWriter
- Row, Column
- functions
- StructType, StructField
- GroupedData, GroupedDataSet


o 스파크세션(SparkSession)
스파크세션은 데이터프레임(DataFrame) 또는 데이터셋(DataSet)ㅇ르 생성하거나 사용자 정의 함수(UDF)를 등록하기 위한 목적으로 사용되며, 스파크 SQL 프로그램은 가장 먼저 스파크세션을 생성하는 것으로 시작해야 한다.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Sample").master("local[*]").getOrCreate()


o 데이터프레임, 로우, 컬럼
데이터프레임은 스파크SQL에서 사용하는 분산 데이터 모델이다. 스파크2.0 미만 버전에서는 데이터프레임이라는 별도의 클래스를 가리키는 용어였지만 스파크2.0부터 스파크 SQL의 또 다른 데이터 모델인 데이터셋과 통합되면서 org.apache.spark.sql.Row 타입의 요소를 가진 데이터셋을 가리키는 별칭으로 사용되고 있다.

스파크의 데이터프레임은 R의 데이터프레임이나 데이터베이스의 테이블과 비슷한 행(Row)과 열(Column)의 구조를 가지며, 데이터프레임에  포함된 데이터는 SQL문을 사용하거나 데이터프레임이 제공하는 프로그래밍 API를 이용해 처리할 수 있다.

데이터프레임을 사용하는 궁긍적인 목적은 RDD와 같이 분산 데이터를 저장하고 처리하기 위한 것이지만 RDD가 데이터와 값을 다루는데 초점을 맞추고 있다면 데이터프레임은 데이터 값뿐만아니라 데이터에 댛나 스키마 정보까지 함께 다루고자 한다는 점에서 차이가 있다.

외부 데이터소스로부터 데이터프레임 생성
파일이나 데이터베이스 같은 외부 저장소에 저장돼 있는 데이터를 읽어서 데이터프레임을 생성할 때 가장 손쉽게 처리할 수 있는 방법은 스파크세션이 제공하는 read() 메소드를 사용하는 것이다.
read() 메소드는 다양한 유형의 데이터소스로부터 데이터프레임을 생성할 수 있는 DataFrameReader 인스턴스를 생성하는데, 이 DataFrameReader를 이용해 데이터프레임을 생성할 수 있다.

-JSON 
val df = spark.read.format("json").option("allowComments","true").
         load("/examples/src/main/resources/people.json")


기존 RDD 및 로컬 컬렉션으로부터 데이터프레임 생성
스파크SQL은 스키마를 지정하는 두 가지 방법을 제공한다. 첫 번째는 리플렉션 API를 활용해 데이터의 스키마 정보를 자동으로 추론해 내는 방법으로 스키마 정의를 위한 별도의 추가 코드가 필요 없기 때문에 간결한 코드를 작성할 수 있다. 두 번째 방법은 직접 스키마 정보를 코드로 작성해서 지정하는 방법인데, 스키마 추론을 위한 부가적인 연산을 줄이고 스키마 정보를 원하는 대로 커스커마이징해서 사용할 수 있다.

1.리플렉션을 통한 데이터프레임 생성
특별한 스키마 정의를 추가하지 않아도 컬렉션에 포함된 오브젝트의 속성값으로부터 알아서 스키마 정보를 추출하고 데이터프레임을 생성한다.
리플렉션 방식을 사용하는 방법은 언어별로 차이가 있다. 스칼라의 경우 scala.Product의 하위 타입인 케이스클래스(Case Class)나 튜플(Tuple) 등의 컬렉션을 사용할 수 있다.

- List
val row1 = Persion("hayoon",7,"student")
val row2 = Persion("sunwoo",7,"student")
val row3 = Persion("hajoo",5,"kindergartener")
val data = List(row1, row2, row3)
val df = spark.createDataFrame(data)

- RDD
val rdd = sc.parallelize(data)
val df = spark.createDataFrame(rdd)

- Tuple
컬럼명은 _1, _2로 자동 부여
val row1 = ("col_1","col2")
val row2 = ("col_1","col2")
val data = list(row1, row2)
val df = spark.createDataFrame(data)


2.명시적 타입 지정을 통한 데이터프레임 생성
RDD의 모든 데이터가 실제 타입과 무관하게 문자열로만 구성돼 있거나 같은 값이라도 상황에 따라 데이터의
타입을 다르게 상용하고 싶다면 원하는 대로 스키마를 지정할 수 있는 방법이 필요하다.

스파크SQL은 직접 스키마를 지정할 수 있는 방법을 제공한다.
데이터프레임의 스키마 정보는 컬럼을 나타내는 StructFile, 로우를 나타내는 StructType으로 정의한다.
StructField에는 컬럼의 이름과 타입, null 허용 여부를 지정하고, StructType에는 컬럼으로 사용할  StructField의 목록을 지정한다. 이때 StructField의 사용 가능한 타입은 org.apache.spark,sql.types,DataType 추상 클래스의 하위 클래스로 정의돼 있다.

val sf1 = StructField("name",StringType, nullable=true)
val sf2 = StructField("age",IntegerType, nullable=true)
val sf3 = StructField("job",StringType, nullable=true)
val schema = StructType(List(sf1, sf2, sf3))

val rows = sc.parallelize(List(Row("hayoon",7,"student"),Row("sunwoo",7,"student"), Row("hajoo",5,"kindergartener"))
val df = spark.createDataFrame(rows, schema)

 

[참고] 위키북스 "스파크2 프로그래밍" 을 정리한 내용임