도커 이미지 조회

docker images

docker images | grep hello

docker images --filter "dangling=true"

-- image id
docker images -q


Docker image를 public repository부터 다운로드

private repository도 사용 가능

docker pull [OPTIONS] NAME[:TAG|@DIGEST]

docker pull ubuntu:18.04


Docker image는 컨테이너에서 사용하지 않는 경우 삭제 가능

사용 중인 image를 강제로 삭제하려면 -f  옵션을 추가한다.

docker ps -a

docker rmi [IMAGE1] [IMAGE2] [IMAGE3] 

docker rmi hello-world

docker rmi -f hello-world

 

쉘 기반으로 한꺼번에 삭제하기

docker rmi -f $(docker images -q)

-- container 삭제
docker rm -f $(docker ps -aq --filter ancestor=hello-world)

 

 

사용하지 않는 이미지 삭제하기

docker image prune

'Docker' 카테고리의 다른 글

Docker 설치  (0) 2023.09.19
Docker Volume  (0) 2022.11.13
Docker 개요  (0) 2022.11.04
Docker 사용법  (0) 2022.11.03

[ Linux Ubuntu ]

우선 기존에 설치된 docker 삭제하고

데이터, config, symlink와 service 파일도 정리한다.

sudo apt get remove docker-desktop
rm -r $HOME/.docker/desktop
sudo rm /usr/local/bin/com.docker.cli
sudo apt purge docker-desktop

 

Docker 설치 전 package repository 업데이트

sudo apt-get update

 

Docker Repository설치, 필요한 package와 GPG 키 설치, Docker 공식 apt를 저장소에 추가

sudo apt-get install ca-certificates curl gnupg
sudo install -m 0755 -d /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
sudo chmod a+r /etc/apt/keyrings/docker.gpg
echo \
  "deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \
  "$(. /etc/os-release && echo "$VERSION_CODENAME")" stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update

 

Docker SW 설치

sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

 

Docker 설치 버전과 프로세스 확인

docker --version

sudo systemctl status docker

 

예제 hello-world image 실행

sudo docker run hello-world

 

[ 사용자 권한 ]

Docker 설치하면 root가 아닌 일반 사용자는 오류가 발생한다.

이런 경우 docker group에 사용자를 할당 해줘야 한다

permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker.sock: 
Get "http://%2Fvar%2Frun%2Fdocker.sock/v1.24/images/json": dial unix /var/run/docker.sock: connect: permission denied

 

먼저 docker 그룹 생성하고 사용자를 그룹에 할당한다

Docker를 설치하면 docker 그룹이 자동으로 생성되지만 없으면 새로 생성한다.

재접속하거나 newgrop으로 그룹을 다시 시작한다.

cat /etc/group
cat /etc/passwd

sudo groupadd docker

sudo usermod -aG docker $USER

newgrp docker

 

 

[참조] https://docs.docker.com/engine/install/ubuntu/

'Docker' 카테고리의 다른 글

Docker - image  (0) 2023.09.19
Docker Volume  (0) 2022.11.13
Docker 개요  (0) 2022.11.04
Docker 사용법  (0) 2022.11.03

도커 볼륨 (Volume)

도커 이미지로 컨테이너를 생성하면 이미지는 읽기 전용이 되며 
컨테이너의 변경 사항만 별도로 저장해서 각 컨테이너의 정보를 보존한다.

컨테이너 실행 후 내부에서 생성된 파일이나 데이터는 컨테이너가 삭제되면 함께 사라진다.
컨테이너의 데이터를 영속성(persistent) 데이터로 활용할 수 있는 방법이 바로 볼륨이다.

컨테이너 자체는 상태가 없고 상태를 결정하는 데이터를 외부에 저장/관리하여
컨터이너가 삭제되도 데이터가 보존되도록 하는 stateless 컨테이너 설계가 중요

 

 

볼륨을 활용하는 방법에는 여러 가지가 있다 .

호스트와 볼륨을 공유, 볼륨을 사용하는 컨테이너를 공유, 도커 자체 볼륨을 활용하는 방법이다.

○ 호스트 볼륨 공유

-v 옵션을 이용해 호스트 위치를 지정하고 컨테이너에서 참조할 path명을 설정한다.

-v [호스트 디렉토리:컨테이너 디렉토리]

 

호스트 d:\docker\volume 디렉토리를 컨테이너에서 volume이란 Path로 공유한다.

여러 컨테이너에서 동일한 호스트 디렉토리를 공유할 수 있다.

 

docker run -v d:\docker\volume:/volume --rm -it --name myvol myvolume:1.0
>cd volume
>touch vol.txt
--> d:\docker\volume\vol.txt

docker run -v d:\docker\volume:/volume --rm -it --name mycache mycache:0.0
>cd volume
>ls vol.txt


볼륨 컨테이너
-v 옵션을 사용하는 컨테이너를 다른 컨테이너에서 공유하여 볼륨을 사용한다.

docker run -v d:\docker\volume:/volume --rm -it --name myvol myvolume:1.0

docker run --rm -it --volumes-from myvol --name mycache  mycache:0.0
>cd volume
>ls vol.txt


○ 도커 볼륨
도커 자체에서 제공하는 볼륨을 활용하여 컨테이너에서 사용한다.

실제 물리적인 저장 위치를 고민할 필요가 없다.

-v [볼륨이름:컨테이너 디렉토리]

 

-- volume 생성
docker volume create myvol

 

-- volume 조회
docker volume ls

 

-- volume 구조
docker volume inspect myvol

 

-- 컨테이너에서 volume 사용
docker run -v myvol:/volume --rm -it --name myvol myvolume:1.0

 

-- 미사용 volume 삭제
docker volume prune

 

'Docker' 카테고리의 다른 글

Docker - image  (0) 2023.09.19
Docker 설치  (0) 2023.09.19
Docker 개요  (0) 2022.11.04
Docker 사용법  (0) 2022.11.03

도커(Docker)는 리눅스 응용 프로그램들을 프로세스 격리 기술들을 사용해 컨테이너로 실행하고 관리하는 오픈 소스 프로젝트이다.

 

컨테이너는 소프트웨어 서비스를 실행하는 데 필요한 특정 버전의 프로그래밍 언어 런타임 및 라이브러리와 같은 종속 항목과 애플리케이션 코드를 함께 포함하는 경량 패키지다.

컨테이너는 운영체제 수준에서 CPU, 메모리, 스토리지, 네트워크 리소스를 쉽게 공유할 수 있게 해주며 컨테이너가 실제로 실행되는 환경에서 애플리케이션을 추상화할 수 있는 논리 패키징 메커니즘을 제공한다.

 

컨테이너는 OpenVZ, LXC, cri-o 등 여러 개가 있지만 도커가 사실상 표준으로 사용되고 있다

 

도커는 리눅스에서 운영 체제 수준 가상화의 추상화 및 자동화 계층을 추가적으로 제공한다.

리눅스 자체 기능인 namespace, cgroups, chroot 등과 같은 리눅스 커널의 기능들과 OverayFS, aufs와 같은 파일 시스템의 리소스 격리 기능을 사용하여  프로세스 단위의 독립된 가상 공간을 만든다.

도커는 리눅스 컨테이너에 여러 기능을 추가함으로써  애플리케이션을 컨테이너로서 좀 더 쉽게 사용할 수 있게 해준다.

 

도커 관련 프로젝트는 도커 컴포즈, 레지스트리, 도커허브, 도커 데스크탑 등 여러 가지가 있지만
도커라고 하면 일반적으로 도커 엔진을 의미한다.
도커 엔진은 컨테이너를 생성하고 관리하는 주체로써 컨테이너를 제어할 수 있는 다양한 기능을 제공하는 핵심기술로써

도커 엔진에서 사용하는 가장 중요한 요소가 이미지와 컨테이너

 

[Docker Architecture]

 

컨테이너에 필요한 커널은 호스트의 커널을 공유해 사용하고, 컨테이너 안에는 어플리케이션을 구동하는데 필요한

라이브러리 및 실행 파일만 존재한다.
도커 컨테이너는 호스트 OS 위에서 실행되는 격리된 공간이다.
컨테이너 내부에 수많은 SW를 설치하고 설정 파일을 수정해도 호스트 OS에는 영향을 끼치지 않는다.

컨테이너 내부에서 SW개발 및 라이브러리 설치 등의 작업을 마친 후 운영 환경에 배포할 경우
컨테이너를 도커 이미지로 패키징하여 운영 서버에서 실행할 수 있다.
운영 서버에서 SW 설치 및 라이브러리 의존성을 고민할 필요가 없다.
서비스 개발 환경을 다른 서버에서 컨테이너로 똑같이 복제할 수 있기 때문에 개발/운영 환경의 통합이 가능해진다.
컨테이너를 이미지로 만들어 배포하는 시간이 빠르고 가상화된 공간을 사용할때 성능 손실도 거의 없다.

 

○ 도커 이미지
컨테이너 생성할때 필요한 요소.

이미지는 여러 개의 계층으로 된 바이너리 파일, 컨테이너를 생성하고 실행할 때 읽기 전용으로 사용된다.

이미지 이름은 [저장소이름]/[이미지이름]:[태그] 로 구성된다.
- 저장소 : 이미지 저장된 장소. 기본 저장소는 도커 허브(Docker Hub)
- 이미지이름 : 이미지 역할, 종류 등 나타내는 이름
- 태그 : 이미지 버전, 생략시 latest 사용

○ 도커 컨테이너
이미지 목적에 맞게 독립된 SW, 자원 및 네트워크을 사용할 수 있는 독립된 공간

대부분의 도커 컨테이너는 생성될 때 사용된 이미지의 종류에 따라 알맞은 설정 파일을 가지게 된다.

하나의 이미지로 부터 여러 개의 컨테이너를 생성 할 수 있다.

컨테이너는 이미지를 읽기 전용으로 사용, 변경된 내용은 컨테이너 내부에 저장하므로 원래 이미지에 영향을 미치지 않는다.

생성된 컨테이너는 독립된 파일시스템을 사용, 호스트와 분리되어 있으므로

특정 컨테이너에서 어떤 애플리케이션이 설치하거나 삭제되어도 다른 컨테이너와 호스트에는 영향이 없다.

 

 

 

 

'Docker' 카테고리의 다른 글

Docker - image  (0) 2023.09.19
Docker 설치  (0) 2023.09.19
Docker Volume  (0) 2022.11.13
Docker 사용법  (0) 2022.11.03


[Docker Command]
o docker 정보
docker info
docker -v


o 도커 이미지 조회
docker images


o 도커 이미지 다운로드
docker image repository 부터 Docker image 를 다운로드
private repository 사용 가능

docker pull [OPTIONS] NAME[:TAG|@DIGEST]

docker pull ubuntu:18.04


o 도커 container 생성
이미지가 없는 경우는 이미지 다운로드 후 컨테이너 생성

docker create [OPTIONS] IMAGE [COMMAND] [ARG...]

docker create -i -t --name myub ubuntu:16.04
-i  상호 입출력
-t  tty활성화해서 bash 셀 사용
-it    -i와 -t를 동시에 사용한 것으로 터미널 입력을 위한 옵션


o 도커 container 생성 & 실행

컨테이너가 없는 경우 생성, 있다면 실행
rm은 컨테이너 종료시 저장된 내용 사라짐

docker run [OPTIONS] IMAGE[:TAG|@DIGEST] [COMMAND] [ARG...]

docker run --rm -it ubuntu:16.04
docker run --rm -it --name myserver ubuntu:16.04 /bin/bash


-d    detached mode 흔히 말하는 백그라운드 모드
-p    호스트와 컨테이너의 포트를 연결 (포워딩)
-v    호스트와 컨테이너의 디렉토리를 연결 (마운트)
-e    컨테이너 내에서 사용할 환경변수 설정
--name    컨테이너 이름 설정
--rm    프로세스 종료시 컨테이너 자동 제거
-i  상호 입출력
-t  tty활성화해서 bash 셀 사용
-it    -i와 -t를 동시에 사용한 것으로 터미널 입력을 위한 옵션
?link    컨테이너 연결 [컨테이너명:별칭]



o 도커 container 실행
컨테이너 ID나 이름을 이용하여 실행 / 진입  

docker start [OPTIONS] CONTAINER [CONTAINER...]

  -a, --attach               Attach STDOUT/STDERR and forward signals
      --detach-keys string   Override the key sequence for detaching a  container
  -i, --interactive          Attach container's STDIN

   
docker start -i myub
docker start -i 77d68f6b75f3
docker attach -i myub


o container command 실행

docker exec -it demo2 /bin/bash


o 도커 container exit
Ctrl + P,Q : 컨테이너를 종료하지 않고 exit
exit : 컨테이너 종료 후 exit


o 도커 container List
컨테이너 목록 조회
- PORTS : 컨테이너가 개방한 포트와 호스크 연결한 포트
- NAMES : 컨테이너 고유한 이름

docker ps [OPTIONS]

-a, --all             Show all containers (default shows just running)
-f, --filter filter   Filter output based on conditions provided
    --format string   Pretty-print containers using a Go template
-n, --last int        Show n last created containers (includes all   states) (default -1)
-l, --latest          Show the latest created container (includes all  states)
    --no-trunc        Don't truncate output
-q, --quiet           Only display container IDs
-s, --size            Display total file sizes


o 도커 container name 변경
docker create -i -t --name myub ubuntu:16.04
docker ps -a
docker rename vigorous_bhaskara myubt


o 도커 container 종료
docker stop myubt
docker stop $(docker ps -a -q)


o 도커 container 삭제
docker rm myubt
docker rm -f myubt


o 도커 image 삭제
docker rmi ubuntu 


o 도커 container 외부 노출
컨테이너는 가상 IP 주소 가짐. 기본 IP 172.17.0.x
기본적으로 도커가 설치면 호스트에서만 접근 가능, 외부에서 접근할 수 없음

host 8080 포트를 컨테이너 80포트로 연결
- http://172.29.32.1:7777  -> container:80
- localhost:7777은 연결 안됨

docker run -it -p 172.29.32.1:7777:80 --name webserver ubuntu:16.04


[ifconfig 설치]
apt update -y
apt install net-tools
ifconfig

[apache 설치]
apt-get update
apt-get install apache2 -y
service apach2 start

o Docker network
docker network ls
docker network inspect bridge

bride : 컨테이너 생성할때 자동으로 연결되는 docker() 브리지 활용
        172.17.0.x IP 대역을 순차적으로 할당


o 도커 container 로깅
docker logs [OPTIONS] CONTAINER

      --details        Show extra details provided to logs
  -f, --follow         Follow log output
      --since string   Show logs since timestamp (e.g.
                       2013-01-02T13:23:37Z) or relative (e.g. 42m for 42
                       minutes)
  -n, --tail string    Number of lines to show from the end of the logs
                       (default "all")
  -t, --timestamps     Show timestamps
      --until string   Show logs before a timestamp (e.g.
                       2013-01-02T13:23:37Z) or relative (e.g. 42m for 42
                       minutes)
                       
docker logs --tail 2 worddb
docker logs demo3
docker logs demo3 -f

-- log 옵션
docker run --rm -it --log-opt max-size=10k --log-opt max-file=3 ubuntu:16.04

-- syslog 설정
docker run --rm -d --name syslog_container --log-driver=syslog ubuntu:16.04 echo syslogtest

test 라는 이름의 busybox 이미지를 백그라운드에서 도커 컨테이너로 실행하여, 1초에 한 번씩 현재 시간을 출력
docker run --name demo3 -d busybox sh -c "while true; do $(echo date); sleep 1;done"


o 도커리소스 제한
-- 메모리
swap 메모리는 메모리의 2배로 설정, swap로 변경 가능(기본메모리보다 커야함)

docker run --rm -it --memory="1g" ubuntu:16.04
docker run --rm -it --memory=10m --memory--swap=50m ubuntu:16.04

-- CUP
1024를 기준으로 비율로 산정
docker run --rm -it --memory=100m --cpu-shares=1024 ubuntu:16.04
docker run --rm -it --memory=100m --cpu-shares=512 ubuntu:16.04

ps aux | grep stres

-- cpu 수 제한
cupset-cups=0.3은 1,3번째CPU, 0-2는 1,2,3번째 CPU
docker run --rm -it --memory=100m --cpuset-cpus=2 ubuntu:16.04
 
[htop] CPU 사용량 확인
agt-get install htop
yum -y install epel-release && yum -y install htop

 


o 도커 컨테이너 어플리케이션 구축
-d detached 모드로 백그라운드 실행
  사용자입력을 받지 않고 반드시 컨테이너에서 프로그램이 실행되어야 함
  포그라인드 프로그램이 실행하지 않으면 컨테이너는 종료됨
  docker ps로 확인할 수 없음-> ps -a 로 확인  
- link : 컨테이너에서 컨터이너로 접근.
  내부 IP와 포트를 사용하지 않고 컨테이터 alias로 접근
  컨테이너를 순차적으로 실행
 
docker run -d --name wordpressdb -e MYSQL_ROOT_PASSWORD=password 
      -e MYSQL_DATABASE=wordpress mysql:5.7
docker run --name wordpress -e WORDPRESS_DB_HOST=mysql -e WORDPRESS_DB_USER=root 
      -e WORDPRESS_DB_PASSWORK=password --link wordpressdb:mysql -p 80 wordpress

docker stop wordpress wordpressdb


o docker volume
docker run -d --name wordpressdb_v -e MYSQL_ROOT_PASSWORD=password -e MYSQL_DATABASE=wordpress 
      -v d:\\docker\NAS:/var/lib/mysql mysql:5.7
docker run --name wordpress_v -e WORDPRESS_DB_HOST=mysql -e WORDPRESS_DB_USER=root 
      -e WORDPRESS_DB_PASSWORK=password --link wordpressdb_v:mysql -p 80 wordpress

docker start -i wordpress_v
docker run --rm -it -v D://docker/home:/NAS ubuntu:16.04
docker run -d --name worddb -e MYSQL_ROOT_PASSWORD=password -e MYSQL_DATABASE=wordpress 
      -v d:\\docker\home\mysql:/var/lib/mysql mysql:5.7
docker run -d --name word -e WORDPRESS_DB_HOST=mysql -e WORDPRESS_DB_USER=root 
      -e WORDPRESS_DB_PASSWORK=password --link worddb:mysql -p 80 wordpress

'Docker' 카테고리의 다른 글

Docker - image  (0) 2023.09.19
Docker 설치  (0) 2023.09.19
Docker Volume  (0) 2022.11.13
Docker 개요  (0) 2022.11.04

RDD는 장점은 분산 환경에서 메모리 기반으로 빠르고 안정적으로 동작하는 프로그램을 작성할 수  있다는 점이다. 하지만 성능 못지 않게 중요한 것이 바로 RDD가 제공하는 풍부한 처리 연산이라고 할 수 있다.
하지만 RDD의 아쉬운 점은 바로 데이터 값 자체는 표현이 가능하지만 데이터에 대한 메타 데이터, 소위 스키마에
대해서는 표현할 방법이 따로 없다는 점이다. 
스파크 SQL은 RDD의 이 같은 단점을 보완할 수 있도록 또 다른 유형의 데이터 모델과 API를 제공하는 스파크 모듈이다.

o 데이터셋
데이터셋은 스파크 1.6 버전에서 처음 소개된 데이터 모델이자 API이다. RDD와 마찬가지로 분산 오브젝트 컬렉션에 대한 프로그래밍 모델이며, 크랜스포메이션과 액션 연산을 포함하고 있다.

데이터셋 이전에는 데이터프레임이라는 API를 사용했는데 이것의 가장 큰 특징은 기존 RDD와는 전혀 다른 형태를 가진
SQL과 유사한 방식의 연산을 제공했다는 점이다. 숫자만으로 구성된 RDD의 모든 요소에 1을 더하기 위해 rdd.map(v => v+1)과
같은 map()연산을 상요한 것에 반해 동일한 요소로 구성된 데이터프레임에서는 df("col")+1과 같은 방법을 사용했다.
이러함 데이터 프로엠은 RDD에 비해 풍부한 API와 옵티마이저를 기반으로 높은 성능으로 복잡한 데이터 처리를 더욱 수월하게
수행할 수 있다는 장점이 있었지만 처리해야하는 작업의 특성에 따라서는 직접 프로그래밍이 가능한 RDD를 사용하는 것에 비해
복잡한 코드를 작성해야 하거나 컴파일 타입 오류 체크 기능을 사용할 수 없다는 단점이 있었다.

스파크2.0부터 데이터프레임이 데이터셋과 통합되면서 데이터셋은 RDD와 데이터프로엠이 제공하던 대부분의 기능을 지원할 수 있게 되었다. 즉 이제는 데이터셋 하나만 사용해 RDD에서 제공하던 형태의 연산도 사용할 수 있고, 기존 데이터프레밍과 같은 SQL기반의 연산도 사용할 수 있게 된 것이다.

연산의 종류와 주요 API
데이터셋에서 제공하는 연산은 RDD와 마찬가지로 액션과 트랜션포메이션이라는 두 종류로 분류할 수 있다.
RDD와 같이 데이터셋을 생성하는 연산은 트랜스포메이션 연산으로, 실제 데이터 처리를 수행하고 결과를 생성하는 연산은 액션 연산으로 분류된다.

데이터셋의 트랜스포메이션은 데이터 타입을 처리하는 방법에 따라 타입연산(typed operations)과 비타입연산(untyped operations)으로 나눌 수 있다. 

val data = 1 to 100 toList
val df = data.toDS
val result = df.map(_+1)
ds.select(col("value")+1)

map(_ + 1)은 데이터의 타입인 정수(Int)로 처리하는 반면 col("value")+1은 마치 데이터베이스의 테이블과 유사하게 처리한 것으로, ds라는 테이블에서 value 컬럼에 1을 더한 결과를 조회한 것과 유사하다. 이때 coㅣ("value")라는 부분은 바로 컬럼을 나타내는 부분으로 여기서 중요한 것은 그 타입이 원래 데이터 타압인 정수(Int)가 아닌 org.apache.spark.sql.Column 타입이라는 점이다.

실제로 데이터셋에는 SQL과 유사한 방식의 데이터 처리를 위해 데이터베이스의 로우(Row)와 칼럼(Column)에 해당하는 타입을 정의하고 있으며, 실제 데이터가 어떤 타입이든지 로우와 컬럼 타입으로 감싸서 처리할 수 있는 구조를 띠고 있다.

비타입 연산이라 데이터를 처리할 때 데이터 본래의 타입이 아닌 org.apache.spark,sql.Row와 org.apache.spark.sql.Column 타입의 객체로 감싸서 처리하는 연산이라고 할 수 있다.
스파크2.0부터는 데이터프레임을 가리켜 Row 타입 데이터셋 이라는 용어를 사용하기도 하는데 그 이유는 데이터프레임이라는 용어가 곧 org.apache.spark.sql.Row 타입의 요소를 포함하고 있는 데이터셋을 가리키는 것이기 때문이다. 


데이터셋과 데이터프레임의 주요 프로그래밍 구성 요소
- 스파크세션(SparkSession)
- 데이터셋(Dataset)
- 데이터프레임(DataFrame)
- DataFrameReader
- DataFrameWriter
- Row, Column
- functions
- StructType, StructField
- GroupedData, GroupedDataSet


o 스파크세션(SparkSession)
스파크세션은 데이터프레임(DataFrame) 또는 데이터셋(DataSet)ㅇ르 생성하거나 사용자 정의 함수(UDF)를 등록하기 위한 목적으로 사용되며, 스파크 SQL 프로그램은 가장 먼저 스파크세션을 생성하는 것으로 시작해야 한다.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Sample").master("local[*]").getOrCreate()


o 데이터프레임, 로우, 컬럼
데이터프레임은 스파크SQL에서 사용하는 분산 데이터 모델이다. 스파크2.0 미만 버전에서는 데이터프레임이라는 별도의 클래스를 가리키는 용어였지만 스파크2.0부터 스파크 SQL의 또 다른 데이터 모델인 데이터셋과 통합되면서 org.apache.spark.sql.Row 타입의 요소를 가진 데이터셋을 가리키는 별칭으로 사용되고 있다.

스파크의 데이터프레임은 R의 데이터프레임이나 데이터베이스의 테이블과 비슷한 행(Row)과 열(Column)의 구조를 가지며, 데이터프레임에  포함된 데이터는 SQL문을 사용하거나 데이터프레임이 제공하는 프로그래밍 API를 이용해 처리할 수 있다.

데이터프레임을 사용하는 궁긍적인 목적은 RDD와 같이 분산 데이터를 저장하고 처리하기 위한 것이지만 RDD가 데이터와 값을 다루는데 초점을 맞추고 있다면 데이터프레임은 데이터 값뿐만아니라 데이터에 댛나 스키마 정보까지 함께 다루고자 한다는 점에서 차이가 있다.

외부 데이터소스로부터 데이터프레임 생성
파일이나 데이터베이스 같은 외부 저장소에 저장돼 있는 데이터를 읽어서 데이터프레임을 생성할 때 가장 손쉽게 처리할 수 있는 방법은 스파크세션이 제공하는 read() 메소드를 사용하는 것이다.
read() 메소드는 다양한 유형의 데이터소스로부터 데이터프레임을 생성할 수 있는 DataFrameReader 인스턴스를 생성하는데, 이 DataFrameReader를 이용해 데이터프레임을 생성할 수 있다.

-JSON 
val df = spark.read.format("json").option("allowComments","true").
         load("/examples/src/main/resources/people.json")


기존 RDD 및 로컬 컬렉션으로부터 데이터프레임 생성
스파크SQL은 스키마를 지정하는 두 가지 방법을 제공한다. 첫 번째는 리플렉션 API를 활용해 데이터의 스키마 정보를 자동으로 추론해 내는 방법으로 스키마 정의를 위한 별도의 추가 코드가 필요 없기 때문에 간결한 코드를 작성할 수 있다. 두 번째 방법은 직접 스키마 정보를 코드로 작성해서 지정하는 방법인데, 스키마 추론을 위한 부가적인 연산을 줄이고 스키마 정보를 원하는 대로 커스커마이징해서 사용할 수 있다.

1.리플렉션을 통한 데이터프레임 생성
특별한 스키마 정의를 추가하지 않아도 컬렉션에 포함된 오브젝트의 속성값으로부터 알아서 스키마 정보를 추출하고 데이터프레임을 생성한다.
리플렉션 방식을 사용하는 방법은 언어별로 차이가 있다. 스칼라의 경우 scala.Product의 하위 타입인 케이스클래스(Case Class)나 튜플(Tuple) 등의 컬렉션을 사용할 수 있다.

- List
val row1 = Persion("hayoon",7,"student")
val row2 = Persion("sunwoo",7,"student")
val row3 = Persion("hajoo",5,"kindergartener")
val data = List(row1, row2, row3)
val df = spark.createDataFrame(data)

- RDD
val rdd = sc.parallelize(data)
val df = spark.createDataFrame(rdd)

- Tuple
컬럼명은 _1, _2로 자동 부여
val row1 = ("col_1","col2")
val row2 = ("col_1","col2")
val data = list(row1, row2)
val df = spark.createDataFrame(data)


2.명시적 타입 지정을 통한 데이터프레임 생성
RDD의 모든 데이터가 실제 타입과 무관하게 문자열로만 구성돼 있거나 같은 값이라도 상황에 따라 데이터의
타입을 다르게 상용하고 싶다면 원하는 대로 스키마를 지정할 수 있는 방법이 필요하다.

스파크SQL은 직접 스키마를 지정할 수 있는 방법을 제공한다.
데이터프레임의 스키마 정보는 컬럼을 나타내는 StructFile, 로우를 나타내는 StructType으로 정의한다.
StructField에는 컬럼의 이름과 타입, null 허용 여부를 지정하고, StructType에는 컬럼으로 사용할  StructField의 목록을 지정한다. 이때 StructField의 사용 가능한 타입은 org.apache.spark,sql.types,DataType 추상 클래스의 하위 클래스로 정의돼 있다.

val sf1 = StructField("name",StringType, nullable=true)
val sf2 = StructField("age",IntegerType, nullable=true)
val sf3 = StructField("job",StringType, nullable=true)
val schema = StructType(List(sf1, sf2, sf3))

val rows = sc.parallelize(List(Row("hayoon",7,"student"),Row("sunwoo",7,"student"), Row("hajoo",5,"kindergartener"))
val df = spark.createDataFrame(rows, schema)

 

[참고] 위키북스 "스파크2 프로그래밍" 을 정리한 내용임

'Spark > Spark2Programming' 카테고리의 다른 글

Spark RDD  (0) 2019.11.26
Spark RDD Transformation  (0) 2019.11.11
Spark RDD Action  (0) 2019.11.11

RDD는 스파크가 사용하는 기본 데이터 모델로서 RDD를 잘 이해하고 다루는 것은 스파크 어플리케이션을 작성할 때 가장 중요한 기초라고 할 수 있다.

RDD를 다루기 전 알아둬야할 중요한 사항
- 스파크 클러스터
- 분산 데이터로서의 RDD
- RDD의 불변성
- 파티션
- HDFS
- Job과 Executor
- 드라이버 프로그램
- 트랜스포메이션과 액션
- 지연(lazy) 동작과 최적화
- 함수의 전달

o 데이터 타입에 따른 RDD 연산
RDD는 데이터 처리에 유용한 다양한 연산을 수행할 수 있는 메소드를 제공한다. RDD가 제공하는 메소드는 데이터 타입과 밀접한 관계를 맺고 있다. 따라서 스파크에서 자주 사용되는 특별한 데이터 유형에 대해 좀 더 특화된 메소드를 제공할 수 있도록 클래스를 제공하고 있다. 
- PairRDDFunctions : 키와 값의 형태로 구성된 데이터 연산을 제공하는 클래스
  groupBykey나 reduceByKey, mapValues와 같이 키를 사용하는 유용한 연산을 제공한다.
- OrderedRDDFunctions : 키와 값으로 구성된 데이터를 대상으로 하며, 키에 해당하는 변수 타입이 정렬 가능한 경우

  를 위한 것이다. sortBykey와 같은 연산을 제공한다.
- DoubleRDDFunctions : double 유형의 데이터를 위한 것으로 합을 구하는 sum()이나 평균을 구하는 mean()과 같은 

  연산을 제공한다.
- SequenceFileRDDFunctions : 하둡의 시퀀스 파일을 다루기 위한 연산을 제공한다.

o 스파크컨텍스트 생성
스파크컨텍스트는 스파크 애플리케이션과 클러스터의 연결을 관리하는 객체로서 모든 스프크 애플리케이션은 반드시
스파크컨텍스트를 생성해야 한다. RDD를 비롯한 스파크에서 사용하는 주요 객체는 스파크컨텍스트를 이용해 생성 할 
수 있다.

val conf = new SparkConf().setMaster("local[*]").setAppName("RDDCreateSample")
val sc = new SparkContext(conf)

o RDD 생성
Spark는 두 종류의 RDD 생성 방법을 제공한다.
- 드라이버 프로그램의 컬렉션 객체를 이용
- 데이터베이스와 같은 외부 데이터를 읽어서 새로운 RDD를 생성

val rdd1 = sc.paralleize("a","b","c","d")   // 컬렉션 객체를 이용한 RDD 생성
val rdd2 = sc.parallelize(1 to 1000, 10)  // 파티션 개수 지정 
val rdd3 = sc.textfile("/README.md")    // 외부파일을 이용한 RDD 생성 

RDD가 제공하는 연산은 트랜스포메이션과 액션으로 나눌 수 있다. 두 연산을 구분하는 기준은 연산의 결과가 RDD인지 아닌지에 달려 있다. 트랜스포메이션은 기존 RDD를 이용해 새로운 RDD를 생성하는 연산이다.

o 클러스터 환경의 공유변수
하둡이나 스파크와 같이 클러스터 환경에서 동작하는 애플리케이션은 하나의 잡(Job)을 수행하기 위해 클러스터에
속한 다수의 서버에서 여러 개의 프로세스를 실행하게 되므로 모든 프로세스가 공유할 수 있는 자원을 관리하기란
쉽지 않다.
따라서 이러한 프레임워크들은 다수의 프로세스가 공유할 수 있는 읽기 자원과 쓰기 자원을 설정할 수 있또록 지원하는데
하둡은 분산캐시와 카운터를, 스파크는 브로드캐스드 변수(Broadcast Variables)와 어큐뮬레이터(Accumulators)를 제공한다.

o broadcast 변수
스파크의 잡이 실행되는 동안 클러스터 내의 모든 서버에서 공유할 수 있는 일기전용 자원을 설정할 수 있는 변수
예를 들어 사용자ID목록이 담긴 세트 컬렉션 타입의 데이터를 공유 변수로 설정해 각 서버에서 로그를 처리하며서 현재 처리
하려는 로그가 우리가 찾고 이쓴 사용자의 로그가 맞는지 확인하는 등의 용도로 사용할 수 있다.

먼저 공유 객체를 생성하고, 스파크컨텍스트의 broadcast 메소드의 인자로 등록한다.
이렇게 생성된 브로드캐스트 변수의 value() 메소드를 통해 접근한다.

val broadcastUsers = sc.broadcast(Set("u1","u2"))
val rdd = sc.parallelize(List("u1","u2","u3","u4","u5","u6"),3)
val result = rdd.filter(broadcastUsers.value.contains(_))

o accumulator 변수
브로드캐스트 변수가 읽기 동작을 위한 것이라면 어큐뮬레이터는 쓰기 동작을 위한 것이다. 
예를 들어 다수의 서버로 구성된 클러스터 환경에서 동작하는 프로그램인 경우 오류가 발생했을 때 이 문제가 어느 서버의 어떤
프로세스에서 발생한 것인지 확인하는 것이 쉽지 않다. 따라서 클러스트의 각 서버에서 보내는 에러 정보를 한 곳에 모아서 
볼 수 방법이 있다면 손쉽게 에러 상황을 디버깅할 수 있을 것이다.
어큐뮬레이터는 이처럼 클러스터 내의 모든 서버가 공유하는 쓰기 공간을 제공함으로써 각 서버에서 발생하는 특정 이벤트의 수를
세거나 관찰하고 싶은 정보를 모아 두는 등의 용도로 편리하게 활용할 수 있다.

어큐뮬레이터를 생성하려면 ora.apache.spark.util.AccumulatorV2 클래스를 상속받은 클래스를 정의하여 인스턴스를 생성해야 한다.
그리고 생성한 인스턴스를 스파크 컨텍스트에서 제공하는 register() 메소드를 이용하여 등록한다.

스파크에서는 자주 사용되는 몇 가지 데이터 타입에 대한 어큐뮬레이터를 미리 정의해 뒸으며 이를 편리하게 사용할 수 있게 생성과
등록 작업을 한번에 처리하는 간단한 메소드를 컨텍스트를 통해 제공한다.
- long Accumulator()
- collectionAccumulator()
- doubleAcculator()

val acc1 = sc.longAccumulator("invalidFormat")
val acc2 = sc.collectionAccumulator("invalidFormat2")
val data = List("U1:Addr1","U2:Addr2","U3","U4:Addr4","U5;Addr5","U6:Addr6","U7::Addr7")
sc.parallelize(data,3).foreach{ v =>
  if(v.split(":").length !=2){
    acc1.add(1L)
    acc2.add(v)
  }
}

자체 어큐뮬레이터를 정의하려면 org.apache.spark.util.AccumulatorV2 추상 클래스를 상속받고 isZero(), copy() 등의 필요한
메소드를 정의해야 한다. 이때 AccumulatorV2[Record, Long]에서 각각 입력 데이터 타입과 출력 데이터 타입을 의미한다.
isZero()의 경우 RecordAccumulator에 포함된 Record의 값이 초깃값에 해당하는지 여부를 확인하기 위한 것으로, 초깃값이 맞으면
true, 아니면 false를 돌려준다.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.util.AccumulatorV2

class RecordAccumulator extends AccumulatorV2[Record, Long]{
  private var _record = Record(0)
  def isZero:Boolean = _record.amount == 0 && _record.number==1
  def copy():AccumulatorV2[Record, Long] = {
    val newAcc = new RecordAccumulator
    newAcc._record = Record(_record.amount, _record.number)
    newAcc
  }
  df reset(): Unit = {
    _record.amount = 0L
    _record.number = 1L
  }
  def add(other: Record): Unit = {
    _record.add(other)
  }
  def merge(other: AccumulatorV2[Record, Long]): Unit = other match {
    case o :  RecordAccumulator => _record.add(o._record)
    case _ => throw new RuntimeException    
  }
  def value: Long = {
    _record.amount
  }
}

val acc = new RecordAccumulator
sc.register(acc, "invalidFormat")
val data = List("U1:Addr1","U2:Addr2","U3","U4:Addr4","U5;Addr5","U6:Addr6","U7::Addr7")
sc.parallelize(data,3).foreach{ v =>
  if(v.split(":").length !=2){
    acc.add(Record(1))
  }
}

어큐뮬레이터를 증가시키는 동작은 클러스터의 모든 데이터 처리 프로세스에서 가능하지만 데이터를 읽는 동작은
드라이버 프로그램 내에서만 가능하다. 즉 RDD의 트랜스포메이션이나 액션 연산 내부에서는 어큐뮬레이터의 값을
증가시킬 수만 있을 뿐 그 값을 참조해서 사용하는 것은 불가능하다.
일부러 의도한 특별한 목적이 없는 한 어큐뮬레이터는 액션 연산을 수행하는 메소드에서만 사용해야 한다.
그 이유는 스파크의 트랜스포메이션 연산은 액션 연산과 달리 하나의 잡 내에서 필요에 따라 수차례 반복 실행될
수 있기 때문이다. 따라서 map()이나 flatMap()과 같은 트랜스포메이션 연산 내용에 어큐뮬레이터의 값을 증가
시키는 코드가 포함될 경우 정확하지 않은 데이터가 집계될 수 있다.

[참고] 위키북스 스파크2 프로그래밍 을 정리한 내용임

'Spark > Spark2Programming' 카테고리의 다른 글

Spark SQL  (0) 2019.12.04
Spark RDD Transformation  (0) 2019.11.11
Spark RDD Action  (0) 2019.11.11

트랜스포메이션은 기존 RDD를 이용해 새로운 RDD를 생성하는 연산이다. 
각 요소의 타입을 문자열에서 숫자로 바꾸거나 불필요한 연산을 제외하거나 기존 요소의 값에 특정 값을 더하는 등의

작업이 모두 포함된다. 자주 사용하는 연산은 맵연산, 그룹화 연산, 집합 연산, 파티션 연산, 필터와 정렬 연산 등이다. 

맵(Map) 연산 : 요소간의 매핑을 정의한 함수를 RDD에 속하는 모든 요소에 적용해 새로운 RDD를 생성 
그룹화 연산 : 특정 조건에 따라 요소를 그룹화하거나 특정 함수를 적용 
집합 연산 : RDD에 포함된 요소를 하나의 집합으로 간주할 때 서로 다른 RDD 간에 합집합, 교집합 등을 계산 
파티션 연산 : RDD 파티션 개수를 조정 
필터와 정렬 연산 : 특정 조건을 만족하는 요소만 선택하거나 각 요소를 정해진 기준에 따라 정렬 


[맵 연산]  
o map 
스파크를 이용한 데이터 처리 작업에서 흔히 사용되는 대표적인 연산 중 하나이다. 
map() 메소드가 인자값으로 함수 하나를 전달받아 RDD에 속하는 모든 요소에 적용한 뒤 그 결과값으로 구성된

새로운 RDD를 생성하여 반환한다. 

val rdd_map = rdd.map(_+1) 
print(rdd_map.collect.mkString(" ")) 

 

o flatMap 
flatMap() 메소드는 map() 메소드와 유사하게 동작한다. 
두 메소드 간의 가장 큰 차이점은 인자로 전달되는 함수가 반환하는 값의 타입이 다르다는 점이다. 

map API : map[U](f:(T) => U) : RDD[U] 
flatMap API : flatMap[U](f:(T) => TraversableOnce[U]) : RDD[U] 

눈여겨 봐야할 부분은 두 메소드의  인자로 전달되고 있는 f라는 함수가 반환하는 값의 타입이다. 
f 함수를 보면 map()에서는 U타입의 출력값을 돌려주는 반면 flatMap()에서는 TraversableOnce[U]라는 타입의 
출력값을 돌려주고 있다. 
U라는 문자는 어떤 값이든 가능하다는 뜻으로 map() 메소드의 경우 인자로 사용하는 함수의 반환 타입에 제약이 

없지만 flatMap() 메소드의 경우는 일정한 규칙을 따라야 한다. 
TraversableOnce는 스칼라에서 사용하는 이터레이션(Iteration) 타입 중 하나이다.  따라서 이 선언의 의미는 flatMap에 사용하는 함수 f는 반환값으로 리스트나 시퀀스 같은  여러 개의 값을 담은 일종의 컬렉션과 유사한 타입의 값을 반환해야 한다. 

val fruits = List("apple,orange", "grape,apple,mango", "blueberry,tomato,orange") 
val rdd1 = sc.parallelize(fruits) 
val rdd2 = rdd1.map(_.split(",")) 
println(rdd2.collect().map(_.mkString("{", ", ", "}")).mkString("{", ", ", "}")) 

val rdd2 = rdd1.flatMap(_.split(",")) 
print(rdd2.collect.mkString(", ")) 

map()의 정의 f:(T)=>U 에서 T가 문자열이고 U가 배열이므로 결과값의 타입은 RDD[배열]이다. 
각 배열 속에 포함된 요소를 모두 배열 밖으로 끄집어내는 작업을 위해서는 flatMap()을 사용해야 한다. 

하나의 입력값에 대응하는 반환값이 여러 개 일 때 유용하게 사용할 수 있다. 

val rdd2 = rdd1.flatMap( log => { 
  // apple이라는 단어가 포함된 경우만 처리 
  if(log.contains("apple"){ 
    Some(log.indexOf("apple")) 
  } else{ 
    None 
  } 
}) 

Some과 None은 값이 있거나 없을 수 있는 옵션 상황을 표시하는 스칼라 타입이다. 
flatMap()에서 요구하는 TraversableOnce 타입은 아니지만 내부적인 반환에 의해  Some은 값이 있는 집합, 

None은 값이 없는 집합처럼 취급된다. 따라서 최종 결과로 모든 집합의 값을 하나로 만들면 Some에 포함된

들만 남게 된다. 


o mapPartitions 
map과 flatMap 메소드가 RDD의 각 요소를 하나씩 처리한다면 mapPartitions()는 파티션 단위로 처리한다. 

인자로 전달받은 함수를 파티션 단위로 적용하고, 그 결과로 구성된  새로운 RDD를 생성한다. 
파티션 단위로 파티션에 속한 모든 요소를 한 번의 함수 호출로 처리할 수 있기 때문에 파티션 단위의 중간 

산출물을 만들거나 데이터베이스 연결과 같은 고비용의 자원을 파티션 단위로 공유해 사용할 수 있다는 장점이 있다. 

val rdd1 = sc.parallelize(1 to 10, 3) 
val Wrdd2 = rdd1.mapPartitions(numbers => { 
  print("DB 연결") 
  numbers.map { number => number + 1 } 
}) 

numbers는 각 파티션에 담긴 요소를 가리키는 것으로 실제로는 각 파티션 내의 요소에 대한 이터레이터(Iterator)이

다. 따라서 mapPartions()에 사용되는 매핑용 함수는 각 파티션 요소에 대한 이터레이터를 전달받아 함수 내부에서

파티션의 개별 요소에 대한 작업을 처리하고 그 결과를 다시 이터레이터 타입으로 되돌려줘야 한다. 

 


o mapPartitionsWithIndex 
mapPartitionsWithIndex는 인자로 전달받은 함수를 파티션 단위로 적용하고 그 결과값으로 구성된 새로운 RDD를 

생성하는 메소드이다. 유사한 이름의 mapPartitions()와 다른 점은 인자로 전달받은 함수를 호출할 때 파티션에 속한

요소이 정보뿐만아니라 해당 파티션의 인덱스 정보도 함께 전달해 준다. 

다음은 파티션 번호(idx) 를 이용해 첫 번째 파티션에서만 결과를 추출한다. 
val rdd1 = sc.parallelize(1 to 10, 3) 
val rdd2 = rdd1.mapPartitionsWithIndex((idx, numbers) => { 
  numbers.flatMap { 
    case number if idx == 1 => Option(number + 1) 
    case _                  => None 
  } 
}) 

 

o mapValues 
RDD 요소가 키와 값의 쌍으로 이루고 있는 경우 PairRDD라는 용어를 사용한다. 
페어RDD에 속하는 데이터는 키를 기준으로 작은 그룹들을 만들고 해당 그룹들에 속한 값을  대상으로 합계나

평균을 구하는 등의 연산을 수행한다. 
mapValues()는 RDD의 모든 요소들이 키와 값의 쌍을 이루고 있는 경우메만 사용 가능한 메소드이며, 
인자로 전달받은 함수를 값에 해당하는 요소에만 적용하고 그 결과로 새로운 RDD를 생성한다. 
즉 키에 해당하는 부분은 그대로 두고 값에만 map()연산을 적용한 것과 같다. 

val rdd = sc.parallelize(List("a", "b", "c")).map((_, 1)) 
val result = rdd.mapValues(i => i + 1) 
println(result.collect.mkString("\t")) 
    


o flatMapValues 
flatMapValues는 RDD의 구성요소가 키와 값으로 이루어진 경우에만 사용할 수 있다. 
mapValues()와 마찬가지로 키는 그대로 두고 값에 해당하는 요소만을 대상으로 flatMap() 연산을 적용한다. 

val rdd = sc.parallelize(Seq((1,"a,b"),(2,"a,c"),(3,"d,c"))) 
val result = rdd.flatMapValues(_.split(",")) 
println(result.collect.mkString(" ")) 

 

[그룹화 연산]
o zip
두 개의 서로 다른 RDD를 각 요소의 인덱스에 따라 하나의(키,값)으로 묶는다.
첫 번째 RDD의 n번째 요소를 키로 하고 두 번째 RDD의 n번째 요소를 값으로 하는순서쌍을 생성한다.
두 RDD는 파티션의 개수와 각 파티션에 속하는 요소의 수가 동일하다고 가정한다.

val rdd1 = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List(1,2,3))
val result = rdd1.zip(rdd2)

 


o zipPartitions
zip() 메소드가 두 개의 서로 다른 RDD 요소를 쌍으로 묶어주는 역할만 수행했다면 zipPartitions()는 파티션

단위로 zip() 연산을 수행하고 특정 함수를 적용해 그 결과로 구성된 새로운 RDD를 생성한다.
zip과 zipPartitions의 중요한 차이점은 zipPartitions()은 요소들의 집합 단위로 병합을 실행하므로 파티션의

개수만 동일해도 된다. 두 번째로 zip()은 인자로 최대 1개의  RDD만 지정하지만 zipPartitions()은 최대 4개까지

지정할 수 있다. 마지막으로 zip()은 단지 두 RDD 요소의 쌍을 만들기만 하지만 zipPartitions()은 병합에 사용할

함수를  인자로 전달받아 사용할 수 있다.

val rdd1 = sc.parallelize(List("a","b","c"),3)
val rdd2 = sc.parallelize(List(1,2,3),3)
val result = rdd1.zipPartitions(rdd2){
  (it1, it2) => for{
  v1 <- it1
  v2 <- it2
  } yield v1+v2
}

 


o groupBy
RDD의 요소를 일정한 기준에 따라 여러 개의 그룹으로 나누고 이 그룹으로 구성된  새로운 RDD를 생성한다. 

각 그룹은 키와 그 키에 속한 요소의 시퀀스로 구성되며,  메소드의 인자로 전달하는 함수가 각 그룹의 키를 

결정하는 역할을 담당한다.

val rdd = sc. parallelize(1 to 100)
val result = rdd.groupBy{
  case i:Int if(i % 2==0) => "even"
  case _ => "odd"
}
result.collect.foreach{
  v => println(s"${v._1}, [$v._2.mkString(",")]")
}
result.foreach{ v => println(s"${v._2}")}

 


o groupBykey
groupByKey()는 키를 기준으로 같은 키를 가진 요소들로 그룹을 만들고 이 그룹들로 구성된 새로운 RDD를 생성한다.

이때 각 그룹은 groupBy()와 같이 키와 그 키에 속한  요소의 시퀀스로 구성된다.
groupBy() 메소드가 요소의 키를 생성하는 작업과 그룹으로 분류하는 작업을 동시에 수행한다면  groupByKey()는

이미 RDD의 구성요소가 키와 값의 쌍으로 이뤄진 경우에 사용 가능한 메소드이다.

val rdd = sc.parallelize(List("a","b","c","b","c")).map((_,1))
val result = rdd.groupByKey
result.collect.foreach { 
  v => println(s"${v._1}, [${v._2.mkString(",")}]")  
}
 

 

o cogroup
RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에만 사용할 수 있는 메소드이다.
여러 RDD에 같은 키를 갖는 값 요소를 찾아서 키와 그 키에 속하는 요소의 시퀀스로 구성된 튜플을 만들고, 

그 튜플들로 구성된 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List(("k1","v1"),("k2","v2"),("k1","v3")))
val rdd2 = sc.parallelize(List(("k1","v4"),("k2","v3")))
val result = rdd1.cogroup(rdd2)
result.collect.foreach {
  case (k, (v_1, v_2)) => 
    println(s"($k, [${v_1.mkString(",")}], [${v_2.mkString(",")}])")
}

cogroup의 실행결과 생성된 result RDD는 rdd1과 rdd2의 전체 키의 개수와 동일한 두 개의  요소를 가진다. 

result의 각 요소는 튜플로 구성되는데 튜플의 첫번째 요소는 키를 의미하고 두 번째 요소는 그 키에 속하는 

값들의 시퀀스를 요소로 하는 또 다른 튜플로 구성된다.
이때 rdd1 요소 중에서 해당 키에 속하는 값들의 시퀀스가 첫 번째, rdd2의 요소의 값들은 두 번째에 위치한다.

 

 

[집합 연산]
o distinct
RDD 요소 중에서 중복을 제외한 요소로만 구성된 새로운 RDD를 생성한다.

val result = rdd.distinct()
println(result.collect.mkString(","))

 


o cartesian
두 RDD 요소의 카테시안곱을 구하고 그 결과를 요소로 하는 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List((1,2,3)
val rdd2 = sc.parallelize(List("a","b","c"))
val result = rdd1.cartesian(rdd2)

 

o substract
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1에는 속하는 rdd2에는 속하지 않는 요소로 구성된
새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List("a","b","c","d","e"))
val rdd2 = sc.parallelize(List("d","e"))
val result = rdd1.substract(rdd2)

 

o union
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1 또는 rdd2에 속하는 요소로 구성된 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List("d","e"))
val result = rdd1.union(rdd2)

 

o intersection
rdd1과 rdd2라는 두 개의 RDD가 있을 때 rdd1과 rdd2에 동사에 속하는 요소로 구성된 새로운 RDD를 생성한다.
intersection()의 결과로 생성된 RDD에는 중복된 원소가 존재하지 않는다.

val rdd1 = sc.parallelize(List("a","a","b","c"))
val rdd2 = sc.parallelize(List("a","c","c","e"))
val result = rdd1.intersection(rdd2)

 

o join
RDD의 요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
두 RDD에서 서로 같은 키를 가지고 있는 요소를 모아서 그룹을 형성하고, 이 결과로 구성된 새로운 RDD를 생성한다. 

데이터베이스의 조인과 같은 유사한 동작을 수행한다.

val rdd1 = sc.parallelize(List("a","b","c","d","e")).map(_, 1)
val rdd2 = sc.parallelize(List("b","c")).map(_, 2)
val result = rdd1.join(rdd2)

 

o leftOuterJoin, rightOuterJoin
RDD의 구성 요소가 키와 값으로 구성된 경우 사용할 수 있다.
rdd1과 rdd2라는 두 개의 RDD가 있을 때 두 메소드는 이름 그대로 왼쪽 외부 조인과 오른쪽 외부 조인을 수행하고
그 결과로 구성된 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List("a","b","c")).map(_, 1)
val rdd2 = sc.parallelize(List("b","c")).map(_, 2)
val result1 = rdd1.leftOuterJoin(rdd2)
val result2 = rdd1.rightOuterJoin(rdd2)

 

o substractByKey
RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
rdd1과 rdd2라는 두 RDD가 있을 때 rdd1.substractByKey(rdd2)는 rdd1의 요소 중에서 rdd2에 같은 키가 존재하는
요소를 제외한 나머지로 구성된 새로운 RDD를 생성한다.

val rdd1 = sc.parallelize(List("a","b","c")).map(_, 1)
val rdd2 = sc.parallelize(List("b")).map(_, 2)
val result1 = rdd1.substractByKey(rdd2)

 


[집계 연산]
o reduceBykey
reduceByKey()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드로서 같은 키를 가진

값들을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDD를 생성한다.
병합을 수행하기 위해 두 개의 값을 하나로 합치는 함수를 인자로 전달받는데, 이때 함수가 수행하는 연산은 
결합법칙과 교환법칙이 성립됨을 보장해야 한다. 왜냐하면 데이터가 여러 파티션에 분산돼 있어서 항상 같은 순서로

연산이 수행됨을 보장할 수 없기 때문이다.

- 두 개의 정수를 더하는 함수를 병합 함수로 사용
val rdd = sc.parallelize(List("a","b","c")).map(_, 1)
val result = rdd.reduceByKey(_ + _)

 

o foldbyKey
RDD 구성요소가 키와 값으로 구성된 경우에 사용할 수 있는 메소드이다. 전반적인 동작은  reduceBykey() 메서드와

유사해서 같은 키를 가진 값을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDDㄹ르 생성한다. 하지만 reduceBykey와는

달리 병합 연산의 초기값을 메소드의 인자로 전달해서 병합시 사용할 수 있다는 점에서 차이가 있다.
교환법칙을 만족하지 않더라고 해당 연산에 대한 결합법칙만 만족한다면 사용할 수 있다. 단 이때 초기값은 여러 번 

반복해도 연산 결과에는 영향을 주지 않는 값이어야 한다.

- 초기값 0을 인자로 두 개의 정수를 더하는 함수를 병합 함수로 사용
val rdd = sc.parallelize(List("a","b","c")).map(_, 1)
val result = rdd.foldByKey(0)(_ + _)

 

o combineByKey
combineBykey()는 RDD의 구성요고가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
reduceBykey()나 foldByKey()와 유사하게 같은 키를 가진 값들을 하나로 병합하는 기능을 수행하지만 수행하는 과정에

서 값의 타입이 바뀔 수 있다는 점에서 두 메소드와 차이가 있다.

def reduceByKey(func: (V,V) => V): RDD[(K,V)]
def foldByKey(zeroValue: V)(func: (V,V) => V): RDD[(K,V)]
def combineByKey(createCombiner: (V) => C, mergeConbiners: (C,C) => C): RDD[(K,C)]

reduceBykey와 foldByKey는 RDD[K,V], 즉 키의 타입이 K이고 값의 타입이 V인 FairRDDFunctions에 정의된 메소드인데 병합에 사용되는 함수와 최종 결과로 만들어진 RDD가 모두 원래의 RDD와 같은 키의 타입은 K, 값의 타입은 V를 유지하고 있다. 이에 반해 combineByKey는 최종 결과 RDD가 C 타입의 값을 가진 RDD[K,C]로 바뀌어 있다.

- createCombiner() : 값을 병합하기 위한 컴바이너를 생성한다. 컴바이너는 두 개의 값을 하나로 병합하는 객체이다.
  컴바이너는 각 키별로 병합을 수행하고 그 결과를 내부에 저장한다. 만약 특정 키에 대해 정의된 컴바이너가 없다면
  이함수를 이용해 컴바이너를 생성한다.
- mergeValue() : 키에 대한 컴바이너가 이미 존재한다면 새로운 컴바이너를 만들지 않고 이 함수를 이용해 값을 기존 
  컴바이너에 병합시킨다. 각 키별 컴바이너 내부에는 병합된 값들이 누적되어 쌓이게 된다.
- mergeCombiners() : createCombiner()와 mergeValue()는 파티션 단위로 수행된다. mergeCombiner()는 병합이 끝난 
  컴바이너들 끼리 다시 병합을 수행해 최종 컴바이너를 생성한다. 파티션 단위로 키별 컴바이너를 생성해서 병합을 
  수행하고 이렇게 생성된 컴바이너를 모두 벙합해 최종 결과를 생성한다.

- 평균 구하기
case class Record(var amount: Long, var number: Long=1){
  def map(v: Long) = Record(v)
  def add(amount: Long): Record = {
    add(map(amount))
  }
  def add(other:Record): Record = {
    this.number += other.number
    this.amount += other.amount
  }
  override def toString: String = s"avg:${amount/number}"
}

val data = Seq("Math",100L),("Eng",80L),("Math",50L),("Eng",60L),("Eng",90L))
val rdd = sc.parallelize(data)
val createCombiner = (v:Long) => Record(v)
val mergeValue = (c:Record, v:Long) => c.add(v)
val mergeCombiners = (c1:Record, c2:Record) => c1.add(c2)
val result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiner)
 

o aggregateBykey
aggregateBykey()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
combineByKey()의 특수한 경우로서 병합을 시작할 초깃값을 생성하는 부분을 제외하면 combineByKey()와
동일한 동작을 수행한다.

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C,V) => C, mergeCombiners: (C,C) => C): RDD[(K,C)]
def aggreateBykey[U](zeroValue:U)(seqOp: (U,V) => U, comOp: (U,U) => U): RDD[(K,C)]

mergeValue와 seqOP, mergeCombiners와 combOp는 서로 같은 역할ㅇ르 하는 함수이다. 차이점이 있다면
combineByKey메소드에서는 병합을 위한 초깃값을 알기 위해 createCombiner라는 함수를 사용하고
aggregateByKey는 zeroValue라는 값을 사용한다. 따라서 aggregateByKey는 combineByKey의 createCombiner함수의 
특정 값 zero를 돌려주는 함수를 사용한 경우이다.

val data = Seq("Math",100L),("Eng",80L),("Math",50L),("Eng",60L),("Eng",90L))
val rdd = sc.parallelize(data)
val zero = Record(0,0)
val mergeValue = (c: Record, v:Long) => c.add(v)
val mergeCombiners = (c1: Record, c2:Record) => c1.add(c2)
val result = rdd.aggregateByKey(zeroad)(mergeValue, mergeCombiners)


[Pipe 및 파티션 연산]
o pipe
pipe를 이용하면 데이터를 처리하는 과정에서 외부 프로세스를 활용할 수 있다.
다음은 세 개의 숫자로 구성된 문자열을 리눅스 cut유틸리티를 이용해 분리한 뒤 첫 번째와 세 번째 숫자를 뽑아낸다.

val rdd = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
val result = rdd.pipe("cut -f 1,3, -d ,")

 


o coalesce, repatition
RDD를 생성한 뒤 filter() 연산을 비롯한 다양한 트랜스포메이션 연산을 수행하다 보면 최최에 설정된 파티션 개수가 

적합하지 않은 경우가 발생할 수 있다. 이 경우 coalesce()나 repartition() 연산을 사용해 현재의 RDD의 파티션 개수를

조정할 수 있다.

두 메소드는 모두 파티션의 크기를 나타내는 정수를 인자로 받아서 파티션의 수를 조정할 수 있지만 처리 방식에 따른 성능의 차이가 있다. repartition이 파티션 수를 늘리거나 줄이는 것을 모두 할 수 있는 반면 coalesce는 줄이는 것만 가능하다.
repartition은 셔플을 기반으로 동작을 수행하는 반면 coalesce는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한

셔플을 사용하지 않기 때문에 성능이 좋다.

val rdd1 = sc.parallelize(1 to 1000000, 10)
val rdd2 = rdd1.coalesce(5)
val rdd3 = rdd2.repartition(10)
println(s"partiton size : ${rdd2.getNumPartitions}")
println(s"partiton size : ${rdd3.getNumPartitions}")

 

o repatitionAndSortWithinPartitions
파티션을 조정해야 하는 경우는 파티션 크기 외에도 다양하게 발생할 수 있다. 같은 성격을 지닌 데이터를 같은 파티션으로 분리하고 싶을 때이다.
하둡 맵리듀스 프로그램의 경우 동일한 성격을 가진 데이터를 하나의 리듀서에서 처리하기 위해 매퍼 출력 데이터의

키와 파티션을 담당하는 파티셔너(Partitioner), 그리고 소팅을 담당하는 컴퍼레이터(Comparator) 모듈을 조합해서 

프로그램을 작성하는 패턴을 오래전부터 사용해 오고 있다.

repatitionAndSortWithinPartitions는 RDD를 구성하는 모든 데이터를 특정 기준에 따라 여러 개의 
파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 이 결과로 새로운 RDD를 생성해 주는 메소드이다.
이 메소드를 사용하려면 우선 데이터가 키와 값 쌍으로 구성돼 있어어 하고 메소드를 실행 할 때 각 데이터가 어떤 

파티션에 속할지 결정하기 위한 파티셔너(Partitioner)를 설정해야 한다.

파티셔너는 각 데이터의 키 값을 이용해 데이터가 속할 파티션을 결정하게 되는데, 이때 키 값을 이용한 정렬도 함께

수행된다. 즉 파티션 재할당을 위해 셔플을 수행하는 단계에서 정렬도 함께 다루게 되어 파티션과 정렬을 각각 따로

하는 것에 비해 더 높은 성능을 발휘할 수 있다.

val r = scala.util.Random
val data = for(i <- 1 to 10) yield (r.nextInt(100), "-")
val rdd1 = sc.parallelize(data)
val rdd2 = rdd1.repatitionAndSortWithinPartitions(new HashPartitioner(0))
rdd2.foreachPartiton(it => {println("=====";it.foreach(v => println(v)) })

rdd가 준비되면 파티셔너로 HashPartitioner를 설정하고 크기를 3으로 지정한다. 따라서 모든 데이터는 3으로 나눈 

나머지 값에 따라 총 3개의 파티션으로 분리된다.
foreachPartiton메소드는 RDD의 파티션 단위로 특정 함수를 실행해 주는 메소드이다.

 


o partitionBy
partitionBy()는 RDD가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
사용할 때 Partitoner 클래스의 인스턴스를 인자로 전달해야 한다.
Partitioner는 각 요소의 키를 특정 파티션에 할당하는 역할을 수행하는데 스파크에서 기본적으로 제공하는 것으로 HashPartitioner와 RangePartitioner의 두 종류가 있다. 만약 RDD의 파티션 생성 기준을 변경하고 싶다면 직접 

Partitioner 클래스를 상속하고 커스트마이징한 뒤 partitonBy() 메소드의 인자로 전달해서 사용하면 된다.

val rdd1 = sc.parallelize(List("apple","mouse","monitor"),5).map{a => (a, a.length) }
val rdd2 = rdd1.partitionBy(new HashPartitioner(3))
println(s"rdd1:${rdd1.getNumPartitons}, rdd2:${rdd2.getNumPartitons}")


[필터와 정렬 연산]
o filter
filter()는 단어의 뜻 그대로 RDD의 요소 중에서 원하는 요소만 남기고 원하지 않는 요소는 걸러내는 
동작을 하는 메소드이다. 동작 방식은 RDD의 어떤 요소가 원하는 조건에 부합하는지 여부를 참 or 거짓으로
가려내는 함수를 RDD의 각 요소에 적용해 그 결과가 참인 것은 남기고 거짓인 것은 버린다.

val rdd = sc.parallelize(1 to 5)
val result = rdd.filter(_ > 2)

filter 연산은 처음 RDD를 만들고 나서 다른 처리를 수행하기 전에 불필요한 요소를 사전에 제거하는
목적으로 많이 사용된다. 필터링 연산 후에 파티션 크기를 변경하고자 한다면 이전에 살펴본 coalesce() 
메소드를 사용해 RDD 파티션 수를 맞춰 조정할 수 있다.

 

o sortBykey
sortBykey는 키 값을 기준으로 요소를 정렬하는 연산이다. 키 값을 기준으로 정렬하기 때문에 단연히
모든 요소가 키와 값 형태로 구성되어 있어야 한다.

val rdd = sc.parallelize(List("q","z","a"))
val result = rdd.map((_,1)).sortByKey()

 

o keys, values
keys()와 values()는 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메소드이다.
두 메소드의 역할은 이름만 보고도 짐잓할 수 있는데, keys()는 키에 해당하는 요소로 구성된 RDD를
생성하고 values()는 값에 해당하는 요소로 구성된 RDD를 돌려준다.

val rdd = sc.parallelize(List(("k1","v1"),("k2","v2"),("k3","v3"))
println(rdd.keys.collect.mkString(","))
println(rdd.values.collect.mkString(","))

 

o sample
sample() 메소드를 이용하면 샘플을 추출해 새로운 RDD를 생성할 수 있다.

sample(withReplacement: Boolean, fraction: Double, seed:Long = Utils.random.nextLong): RDD[U]

withReplacement는 복원 추출을 수행할지 여부를 결정하는 것으로 true로 설정되면 복원추출을 수행한다.
fraction은 복원 추출과 비복원 추출일 경우 의미가 달라진다. 복원 추출일 경우 샘플 내에서 각 요소가 
나타나는 횟수에 대한 기대값, 즉 각 요소의 평균 발생 횟수를 의미하며, 반드시 0 이상의 값을 지정해야 한다.
비복원 추출일 경우에는 각 요소가 샘플에 포함될 확률을 의미하며 0과 1사이의 값으로 지정할 수 있다.
seed는 일반적인 무작위 값 추출 시 사용하는것과 유사한 개념으로 반복 시행시 결과가 바뀌지 않고 일정한 

값이 나오도록 제어하는 목적으로 사용할 수 있다.

sample메소드가 샘플의 크기를 정해놓고 추출을 실행하는 것인 아니다. fraction 0.5를 지정한다고 해서
전체 크기의 절반에 해당하는 샘플이 추출되는 것으로 기대해서는 안된다.
정확한 크기의 샘플을 추출하고자 한다면 takeSample 메소드를 사용해야 한다.

val rdd = sc.parallelize(1 to 100)
val result1 = rdd.sample(false, 0.5)
val result2 = rdd.sample(true, 1.5)

 

[참고] 위키북스 스파크2 프로그래밍 을 정리한 내용임

'Spark > Spark2Programming' 카테고리의 다른 글

Spark SQL  (0) 2019.12.04
Spark RDD  (0) 2019.11.26
Spark RDD Action  (0) 2019.11.11

RDD Action

액션은 메소드 중에서 결과값이 정수나 리스트, 맵 등 RDD가 아닌 다른 타입인 것을 통칭해서 부르는 용어이다.
결과값이 RDD인 메소드는 트랜스포메이션이라는 유형으로 분류되는데 모두 느슨한 평가(lazy evaluation) 라는 
방식을 채택하고 있다는 공통점을 가지고 있다.

느슨한 평가 방식을 따른다는 것은 메소드를 호출하는 시점에 바로 실행하는 것이 아니고 계산에 필요한
정보를 누적해서 내포하고 있다가 실제로 계산이 필요한 시점에 실행된다는 것을 의미한다. 이런 방식은
소위 함수형 프로그래밍 과정에서 특히 자주 활용되며, 언어에 따라 조금씩 다른 설계 및 구현 방법이 
알려져 있다.

RDD의 경우 트랜스포메이션에 속하는 메소드는 느슨한 평가 방식을 사용한다. 따라서 호출하는 시점에
실해되는 것이 아니라 액션으로 분류되는 메소드가 호출돼야만 비로소 실행된다.

o first
first 연산은 RDD 요소 가운데 첫 번째 요소를 반환한다. 스파크 쉡에서 작업할 때 트랜스포메이션의 수행 결과
등을 빠르게 확인하는 용도로 활용할 수 있다.

val rdd = sc.parallelize(List(5,4,1))
val result = rdd.first

o take
take()는 RDD의 첫 번째 요소부터 순서대로 n개를 추출해서 되도려주는 메소드이다. 원하는 데이터를 찾기 위해
RDD의 전체 파티션을 다 뒤지지는 않지만 최종 결과 데이터는 배열 혹은 리스트와 같은 컬렉션 타입으로 반환하기 
때문에 지나치게 큰 n값을 지정하면 메모리 부족 오류가 발생할 수 있다.

val rdd = sc.parallelize(1 to 20, 5)
val result = rdd.take(5)

o takeSample
takeSample은 RDD 요소 가운데 지정된 크기의 샘플을 추출하는 메소드이다. sample 메소드와 유사하지만 샘플의
크기를 지정할 수 있다는 점과 결과 타입이 RDD가 아닌 배열이나 리스트 같은 컬렉션 타입이라는 차이점이 있다.

val rdd = sc.parallelize(1 to 100)
rdd.takeSample(false, 10)

o collect
RDD의 모든 원소를 모아서 배열 혹은 리스트 같은 컬렉션을 반환한다.
모든 요소들이 collect 연산을 호출한 서버의 메모리에 수집된다.
따라서 전체 데이터를 담을 수 있을 정도의 충분한 메모리 공간이 있어야 한다.

val rdd = sc.parallelize(1 to 10)
val result = rdd.collect
println(result.mkString(", "))

mkString은 리스트에 담긴 요소를 하나의 문자열로 표현하는 메소드


o count
RDD를 구성하는 전체 요소의 개수를 반환한다.
 
val result = rdd.count
println(result)

o countByValue
countByValue()는 RDD에 속하는 각 값들이 나타나는 횟수를 구해서 맵 형태로 반환한다.

val rdd = sc.parallelize(List(1,1,2,3,3,))
val result = rdd.countByBalue

o reduce
reduce는 RDD에 포함된 임의의 값 두개를 하나로 합치는 함수를 이용해 RDD에 포함된 모든
요소를 하나의 값으로 병합하고 그 결과값을 반환한다. 

def reduce(f: (T,T) => T): T

f라고 표시된 것이 리듀스 메소드의 인자로 전달되는 함수를 의미하는데, T 타입의 변수 두 개를
입력받아 역시 동일한 T 타입의 값을 돌려주는 함수를 사용하고 있다. 이때 T 타입은 RDD에 포함된
요소의 타입을 의미한다.
따라서 RDD[Int] 타입에서는 정수값 두 개를 입력으로 받아 정수값을 반환하는 함수를 사용하고,
RDD[List] 타입의 RDD에서는 리스트 두 개를 입력으로 받아 리스트를 반환하는 함수를 사용해야 한다.

스파크 어플리케이션이 클러스터 환경에서 동작하는 분산 프로그램이기 때문에 실제 병합이 첫 번째 요소부터
마지막 요소까지 순서대로 처리되는 것이 아니고 각 서버에 있는 파티션 단위로 나눠져서 처리된다. 따라서
리뷰스 메소드에 적용하는 병합 연산은 RDD에 포함된 모든 요소에 대해 교환법칙과 결합법칙이 성립되는 경우에만
사용 가능하다.

val rdd = sc.parallelize(1 to 10, 3)
val result = rdd.reduce( _ + _)

o fold
fold는 reduce와 같이 RDD 내이 모든 요소를 대상으로 교환법칙과 결합법칙이 성립되는 바이너리 함수를 
순차 적용해 최종 결과를 구하는 메소드이다. 
fold와 reduce의 차이점은 reduce 연산이 RDD에 포함된 요소만 이용해 병합을 수행하는 데 반해 fold 연산은
벙합 연산의 초기값을 지정해 줄 수 있다

def fold(zeroValue: T)(op: (T,T) => T)

op는 병합에 사용하는 함수를 의미하는 것으로, reduce에서도 동일한 형태로 정의된 함수를 사용하고 있다.
zeroValue에 해당하는 값은 병합이 시작될 때 op 함수의 첫 번째 인자값으로 전달된다.
fold 메소드도 reduce와 마찬가지로 여러 서버에 흩어진 파티션에 대해 병렬로 처리된다. 따라서 fold에 지정된
초기값은 각 파티션별로 부분 병합을 수행할 때마다 사용되기 때문에 여러 번 반복 적용돼도 문제가 없는 값을
사용해야 한다.

val rdd = sc.parallelize(1 to 10, 3)
val result = rdd.fold(0)(_ + _)

reduce 연산의 경우 RDD에 포함된 요소만 병합에 수행하기 때문에 파티션에 속하는 원소가 하나도 없다면
해당 파티션에서는 처리가 수행되지 않지만 fold 연산의 경우 파티션에 속하는 요소가 하나도 없더라도
초기값으로 인해 최소 한번의 연산이 수행된다.

o aggregate
reduce나 fold 메소드의 경우 모두 입력과 출력 타입이 동일해야 한다는 제약이있다.
aggregate 메소드는 타입 제갸 사항이 없기 때문에 입력과 출력의 타입이 다른 경우에도 사용할 수 있다.

메소드의 인자로 3개를 사용하는데 첫 번째는 fold메소드와 유사한 초기값을 지정하는 것이고, 두 번째는
각 파티션 단위 부분합을 구하기 위한 병합함수, 그리고 이렇게 파티션 단위로 생성된 부분합을 최종적으로
하나로 합치기 위한 또 다른 병합함수로 구성된다.

def aggregate[U](zeroValue: U)(seqOp: (U,T) => U, comOp: (U,U) => U)(implicit arg0: ClassTag[U]): U

첫 번째 인자인 zeroValue는 병합의 초기값으로 사용할 값이다. 두 번째 인자인 seqOp는 U와 T타입의 값을
입력값으로 전달받아 U 타입의 값을 돌려주고 있는데 이때 T는 RDD의 요소들이 갖는 타입을 의미하며, U는
zeroValue로 전달했던 초기값과 같은 타입을 의미한다. 예를 들어, RDD[String] 타입의 RDD에서 zeroValue로
비어있는 Set을 사용했다면 seqOp는 (Set,String) => Set과 같은 형태가 돼야 한다.
aggregate는 두 단계에 결처 병합ㅇ르 처리하는데 첫 번째는 파티션 단위로 병합을 수행하고 두 번째 단계에서는
파티션 단위 병합 결과끼리 다시 병합을 수행해서 최종 결과를 생성한다. 이때 파티션 단위 병합에는 첫 번째 인자인
seqOp 함수가 사용되며 두 번재 최종 병합에는 두 번째 인자인 combOp 함수가 사용된다.

val rdd = sc.parallelize(List(100, 80, 75, 90, 95,), 3)
val zeroValue = Record(0,0)
val seqOp = (r:Record, v:Int) => r.add(v)
val comOp = (r1:Record, r2:Record) -> r1 add r2
val result rdd.aggregate(zeroValue)(seqOp, combOp)

val result2 = rdd.aggregate(Record(0,0))(_ add _, _ add _)

o sum
RDD를 구성하는 요소의 타입에 따라 좀 더 특화된 편리한 연산을 제공하기 위해 특정 타입의 요소로 구성된 
RDD에서만 사용 가능한 메소드를 정의하고 있다. 대표적인 경우로 키와 값 형태의 요소를 위한 reduceByKey나
combineBykey 등을 들 수 있다.
sum 메소드도 그중 하나로서, RDD를 구성하는 모든 요소가 double, Long 등 숫자 타입일 경우에만 사용 가능하며
전체 요소의 합을 구한다.

val rdd = sc.parallelize(1 to 10)
val result = rdd.sum

o foreach, foreachPartition
foreach는 RDD의 모든 요소에 특정 함수를 적용하는 메소드이다. 인자로 한개의 입력값을 가지는 함수를 전달받는데
이렇게 전달받은 함수에 각 RDD 요소를 하나씩 입력값으로 사용해 해당 함수를 실행한다.

foreachPartition도 foreach와 같이 실행할 함수를 인자로 전달받아 사용하는데, foreach와 다른 점은 해당 함수를
개별 요소가 아닌 파티션 단위로 적용한다는 점이다. 이와 유사한 메소드로는 mapPartitions, mapPartitionsWithIndex가 
있는데 mapPartitions가 함수의 실행 결과로 새로운 RDD를 되돌려 주는 데 반해 foreachPartitions는 함수를 실행할 뿐
결과값을 돌려주지 않는다는 차이점이 있다.

val rdd = sc.parallelize(1 to 10, 3)
rdd.foreach { v =>
  println(s"Value Side Effect : ${v}")
}
rdd.foreachPartition(values => {
  println("Partition Side Effect!")
  for(v <- values println(s"Value Side Effect : ${v})
)

o toDebugString
toDebugString 메소드는 이름 그대로 디버깅을 위한 메소드이다. RDD의 파티션 개수나 의존성 정보 등 세부정보를 알고
싶을때 사용할 수 있다. 

val rdd = sc.parallelize(1 to 100, 10).map(_ * 2).persist.map(_ + 1).coalesce(2)

println(rdd.toDebugstring)

o cache, persist, unpersist
RDD는 액션 연산이 수행될 때마다 관련 트랜스포메이션 연산을 반복한다. 이때 기존에 사용했던 데이터가 메모리에 남아
있다면 그 데이터를 사용하지만 다른 이유로 인해 데이터가 남아 있지 않다면 RDD 생성 히스토리를 이용해 복구하는
단계를 수행한다. 따라서 다수의 액션 연산을 통해 반복적으로 사용되는 RDD인 경우 데이터를 메모리 등에 저장해 두는 
것이 매번 새로운 RDD를 만들어 내는 것보다 유리하다.

cache와 persist는 첫 액션을 실행한 후 RDD 정보를 메모리 또는 디스크 등에 저장해서 다음 액션을 수행할 때 불필요한
재성성 단계를 거치지 않고 원하는 작업을 즉시 실행할 수 있게 해주는 메소드이다. 이 중에서 cache는 RDD의 데이터를
메모리에 저장하라는 의미로, 만약 메모리 공간이 충분치 않다면 부족한 용량만큼 저장을 수행하지 않게 된다. 이에 반해
persist는 StorageLevel이라는 옵션을 이용해 저장 위치와 저장방식 등을 상세히 지정할 수 있는 기능을 제공한다.
unpersist 메소든느 이미 저장 중인 데이터가 더 이상 필요없을 때 캐시 설정을 취소하는데 사용된다.

val rdd = sc.parallelize(1 to 100, 10)
rdd.cache
rdd.persist(StorageLevel.MEMORY_ONLY)

o partitions
partitins는 RDD의 파티션 정보가 담긴 배열ㅇ르 돌려준다. 이때 배열에 담김 요소는 Partition 타입 객체이며, 파티션의
인덱스 정보를 알려주는 index() 메소드를 포함하고 있다. partitions는 파티션의 크기를 알아보기 위한 용도로 많이
활용하기도 하는데 단순히 크기 정보만 알아볼 목적이라면 getNumPartitions 메소드를 사용하는 것이 더 편리하다.

val rdd = sc.parallelize(1 to 100, 10)
println(rdd.partitinos.size)
println(rdd.getNumPartitions)

[참고] 스파크2 프로그래밍 위키북스

'Spark > Spark2Programming' 카테고리의 다른 글

Spark SQL  (0) 2019.12.04
Spark RDD  (0) 2019.11.26
Spark RDD Transformation  (0) 2019.11.11
소프트웨어 아키텍처는 기술과 비즈니스, 사회적인 영향 요소가 어우러진 결과물이며, 아키텍처는 기술과 비즈니스, 사회 환경에 영향을 준다. 또한 이런 환경 요소들은 다시 미래 아키텍처에 영향을 미친다. 아키텍처에서 환경으로, 환경에서 아키텍처로 서로 영향을 주고 받는 순환관계를 아키텍처 비즈니스 사이클(ABC, Architecture Business Cycle)이라고 한다.

아키텍처에 영향을 주는 요인

아키텍처는 업무적 요소와 기술적 요소가 결합된 결과물이다. 아키텍처를 설계하는데 영향을 주는 요소는 여러 가지고 있다. 이런 영향 요소는 아키텍처가 실제로 수행될 환경에 따라 변하기 마련이다.
아키텍트는 제품과 관련된 이해관계자들이 요구사항에 영향을 받는다. 뿐만아니라 개발 조직의 목표나 구성, 기술 환경, 아키텍트의 지식과 경험 등도 영향을 주는 요소라 할수 있다.

▪ 이해관계자
개발할 시스템에 관심을 갖고 있는 모든 사람을 조직을 이해관계자라고 한다. 고객, 사용자, 개발자, 프로젝트관리자, 유지보수 담당자, 심지어 마케팅 담당자도 포함될 수 있다. 이해관계자는 각자 시스템에 대한 다양한 요구사항을 제시한다.
다음의 일반적인 시스템 특성은 아키텍처의 전반적인 설계사항을 결정하게 된다. 모든 특성들은 시스템의 이해관계자와 깊은 관계가 있으므로 아키텍터는 반드시 그들의 목소리에 귀기울여야 한다.

일반적인 시스템 특성
- 성능(Performance)
- 신뢰성(reliability)
- 가용성(availability)
- 플랫폼 호환성(platform comopatibility)
- 메모리 사용 효율(memory utilization)
- 네트워크 사용량(network usage)
- 변경용이성(midifiability)
- 사용편의성(usability)
- 타시스템과의 호환성(interoperability)

이해관계자마다 목적이 다르고 따라서 이들의 요구사항이 서로 상충될 수 있다. 일반적인 특성들은 요구사항 기술서에 정의되지만 시스템의 특성은 충분히 표현되지 않을 뿐만 아니라 테스트 가능한 수준까지 기술되지 않는다. 아키텍트는 이렇게 부족한 시스템 특성을 모두 기록해서 테스트 가능한 수준까지 작성할 수 있어야 하며 시스템의 품질속성 간에 상충되는 부분에 대해 합의점을 찾아내야 한다. 

▪ 개발조직
조직의 목표는 일반적으로 요구사항에 반영되지만, 개발 조직의 특성이나 구조는 아키텍처에 영향을 준다. 개발자들의 숙력도, 개발 일정과 예산도 아키텍처에 영향을 주는 요인이다.

개발 조직이 아키텍처에 끼치는 영향은 단기 사업, 장기 사업, 조직 구성 세 부분으로 나눌 수 있다.
- 아키테처나 아키텍처를 구현한 제품으로 단기 사업을 진행할 수 있다.
제품뿐 아니라 제품의 아키텍처 또한 유사한 프로젝트에서 재사용돼 프로젝트 개발 비용을 줄일 수 있음으로 제품으로서의 가치를 지닌다.
- 장기 사업에 투자하는 것을 전략 목표로 가져갈 수 있다.
제안 시스템을 재무 수단으로 삼아 조직 가반을 확장 할수 있다.
- 조직은 아키텍처의 형태를 결정한다.
하위 시스템은 특정한 기술을 필요로 하므로 그 기슬을 가진 별도 조직과 계약을 하게 된다. 이것은 아키텍처를 기능으로 나누는 중요한 요인이므로 아키텍트는 특정 기술의 하위 시스템을 별도의 모듈로 나눠 아키텍처를 수립해야 한다.
 
▪ 배경과 경험
아키텍트의 교육과 연습, 자주 사용되는 아키텍처 패턴, 시스템에 적용해본 경험 등을 통해 아키텍처가 수립된다. 또한 책이나 교육을 통해 새로 배운 아키텍처 패턴이나 기술들이 새로운 시스템에 성공적으로 적용되는지 확인하고 싶을 것이다.

▪ 기술적 경험
아키텍트의 지식과 경험은 대개 기술적  환경과 관련이 있다. 아키텍처를 수립하는 단계에서 당시의 기술적 환경은 아키텍처에 영향을 미친다. 기술적 환경은 업계의 표준적인 관행이나 아키텍트 전문가 커뮤니티에 널리 퍼진 소프트웨어 공학 기법 등을 말한다.

▪ 기타요인
아키텍트는 다양한 요구사항과 특성을 세밀히 파악해야 하고, 어느 요구사항이 더 중요한지 프로젝트 초기에 파악할 수 있어야 한다. 반드시 이해관계자들과 자발적이고 능동적인 토론을 통해 그들의 요구사항과 기대치를 알아내야 한다. 프로젝트 초기에 이해관계자들이 참여시켜 제약사항을 파악하고 기대치를 관리하면서 우선순위를 협의하고 트레이드 오프를 찾아내야 한다. 또한 이해관계자에게 모든 요구사항을 동시에 수용할 수 없음을 설명해야 한다. 상충되는 요구사항 중 어느 하나를 선택한 이유에 대해 타당한 근거를 들어 설명해야 한다. 
아키텍트가 갖추어야 할 능력은 기술 지식 뿐만아니라  절충 능력과 협상력, 대화 능력을 반드시 지녀야 한다.

 

 


'아키텍처' 카테고리의 다른 글

소프트웨어 아키텍처  (0) 2011.03.09

+ Recent posts