Spark 2
Spark 간단히 살펴보기
스파크의 기본 아키텍쳐
컴퓨터 클러스터는 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용할 수 있게 함으로써 대규모 정보를 연산할 자원과 성능을 보장합니다. 스파크는 컴퓨터 클러스터에서 작업을 조율할 수 있도록 돕는 프레임워크로 클러스터의 데이터 처리 작업을 관리하고 조율합니다.

클러스터 매니저
스파크가 연산에 사용할 클러스터는 Spark Standalone Cluster Manager, Hadoop YARN, Mesos같은 클러스터 매니저에서 관리합니다. 사용자는 클러스터 매니저에 스파크 애플리케이션을 제출하고, 이를 제출받은 클러스터 매니저는 애플리케이션 실행에 필요한 자원을 할당한다. 애플리케이션은 할당받은 자원으로 각 작업을 수행합니다.
스파크 애플리케이션
스파크 애플리케이션은 드라이버 프로세스와 다수의 익스큐터 프로세스로 구성됩니다. 드라이버 프로세스는 클러스터 노드 중 하나에서 main() 함수를 실행합니다. 이 프로세스는 애플리케이션의 수명 주기 동안 관련 정보를 유지하며, 스파크 애플리케이션 정보의 유지 관리, 사용자 프로그램이나 입력에 대한 응답, 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포 그리고 스케줄링 역할을 수행하기에 필수적입니다. 익스큐터는 드라이버 프로세스가 할당한 작업을 수행합니다. 크게 드라이버로부터 할당된 코드를 실행하고 진행 상황을 다시 드라이버 노드에 보고하는 두 가지 역할로 구성되어 있습니다.하나의 클러스터에서 여러 개의 스파크 애플리케이션을 실행할 수 있으며, 사용자는 각 노드에 할당할 익스큐터의 수를 지정할 수 있습니다. 드라이버와 익스큐터는 프로세스로, 클러스터 모드로 서로 다른 머신에서 실행될 수도 있고, 로컬환경의 단일머신에서 스레드 형태로 실행될 수도 있습니다.
스파크의 다양한 언어 API

스파크는 다양한 프로그래밍 언어로 스파크 코드를 실행할 수 있는 언어 API를 제공합니다. Scala, Java, Python, SQL, R 언어에 맞는 핵심 개념을 제공하고, 이것은 클러스터에서 스파크 코드로 변환된 후 실행됩니다. 예를 들어, Python과 R의 경우 JVM 코드를 명시적으로 작성하는 대신, SparkSession 객체를 진입점으로 사용하여 익스큐터의 JVM에서 실행할 수 있는 스파크 코드로 변환되는 과정을 거치게 됩니다.
Spark API
다양한 언어로 스파크를 사용할 수 있는 이유는 스파크가 기본적으로 두 가지 API를 제공하기 때문입니다. 하나는 저수준의 비구조적(Unstructured) API이고, 다른 하나는 고수준의 구조적(Structured) API입니다.
SparkSession
스파크 애플리케이션은 SparkSession이라 불리는 드라이버 프로세스로 제어합니다. SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행하며, 하나의 SparkSession이 하나의 스파크 애플리케이션에 대응한다. Scala, Python 콘솔을 통해 대화형 모드로 스파크를 실행하면 SparkSession이 자동으로 생성되지만, Stand-alone Application으로 실행 시 SparkSession 객체를 직접 생성해야합니다. 대화형 모드로 간단한 코드 예제를 살펴봅시다.
// spark in scala
val myRange = spark.range(1000).toDF("number")위의 코드를 실행하여 생성된 DataFrame은 1개의 칼럼과 1,000개의 로우로 구성되며 각 로우에는 0부터 999까지의 값이 할당되어 있다. 이 숫자는 분산 컬렉션을 나타내며 숫자 범위의 각 부분이 코드 실행 시 서로 다른 익스큐터에 할당됩니다.
DataFrame
DataFrame은 데이터를 Column과 Row로 표현하는 가장 대표적인 구조적 API입니다. 이때 칼럼과 칼럼 타입을 정의한 목록을 스키마라고 부릅니다. DataFrame은 유사한 Column-Row 형태인 스프레드시트와 달리 수천 대의 컴퓨터에 분산되어 있습니다. 이 방식은 단일 컴퓨터에 저장하기에 큰 양의 데이터를 저장하는 경우나 데이터 계산에 오랜 시간이 소요될 경우 유리합니다.

Partition
파티션은 클러스터의 물리적 머신에 존재하는 로우의 집합을 의미합니다. 스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라는 청크 단위로 데이터를 분할합니다. DataFrame의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타냅니다. DataFrame을 이용하여 물리적 파티션에 데이터 변환용 함수를 지정하면 스파크가 실제 처리 방법을 자동으로 결정합니다. 나아가, RDD 인터페이스를 이용하는 저수준 API도 제공됩니다.
그러나 수천 개의 익스큐터가 있더라도 파티션이 하나인 경우 또는 수백 개의 파티션이 있더라도 익스큐터가 하나인 경우에는 병렬성이 1이 됩니다.
Transformation
스파크의 핵심 데이터 구조는 불변성을 가지기 때문에 한 번 생성하면 변경할 수 없습니다. DataFrame을 변경하려면 원하는 변경 방법을 스파크에게 알려줘야 하며 이때 사용하는 명령을 트랜스포메이션이라고 부릅니다. 예제를 통해 살펴봅시다.
// spark in scala
val divisBy2 = myRange.where("number % 2 = 0")위 코드는 DataFrame에서 짝수를 찾는 트랜스포메이션 예제로, 코드 실행 시 추상적인 트랜스포메이션만 지정한 상태가 됩니다. 이때 액션을 실행하지 않으면, 스파크는 실제 트랜스포메이션을 수행하지 않습니다.
좁은 의존성과 넓은 의존성
트랜스포메이션은 스파크에서 비즈니스 로직을 표현하는 핵심 개념으로, 좁은 의존성(narrow dependency)과 넓은 의존성(wide dependency)의 두 가지 유형으로 나뉩니다. 좁은 의존성을 가진 트랜스포메이션은 각 입력 파티션이 하나의 출력 파티션에만 영향을 미칩니다. 반면 넓은 의존성을 가진 트랜스포메이션은 하나의 입력 파티션이 여러 출력 파티션에 영향을 미칩니다.

위 코드의 where 구문을 좁은 트랜스포메이션의 예시로 들 수 있습니다. 좁은 트랜스포메이션을 사용하면, 스파크에서 파이프라이닝(Pipelining)을 자동으로 수행하여, DataFrame에 여러 필터를 적용하여도 모든 작업이 메모리에서 일어납니다. 넓은 트랜스포메이션의 경우 스파크가 클러스터에서 파티션을 교환하는 셔플(suffle)을 수행하는데, 셔플의 결과를 디스크에 저장한다는 점에서 위와 다른 방식으로 동작합니다. 디스크 입출력이 포함되는 연산이기에 셔플 최적화를 위한 논의가 필요합니다.
지연연산 (Lazy Evaluation)
스파크는 특정 연산 명령이 내려진 즉시 데이터를 수정하는 대신 원시 데이터에 적용할 트랜스포메이션의 실행계획을 생성합니다. 스파크는 코드를 실행하는 마지막 순간까지 대기하다가 원형 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일합니다. 이 동작 방식을 지연연산이라고 하며, 스파크는 지연연산을 통해 전체 데이터 흐름을 최적화하는 강점을 가집니다.일례로 **DataFrame의 조건절 푸시다운(predicate pushdown)**을 들 수 있습니다. 복잡한 스파크잡이 원시 데이터에서 하나의 로우만 가져오는 필터를 가진다면, 필터를 데이터소스로 위임하여 하나의 로우만 읽는 방식으로 최적화 작업을 수행합니다. 데이터베이스의 WHERE절로 필터를 위임하여 스파크는 하나의 레코드만 받으면 처리에 필요한 자원을 최소화할 수 있습니다.
Action
트랜스포메이션을 통해 논리적 실행 계획을 수립한 후 액션 명령을 통해 실제 연산을 수행한다. 액션은 일련의 트랜스포메이션으로부터 결과를 계산하도록 지시하는 명령으로, 콘솔에서 데이터를 보는 액션(count 등), 각 언어로 된 네이티브 객체에 데이터를 모으는 액션(aggregate 등), 출력 데이터소스에 저장하는 액션(persist 등)의 세 가지 유형으로 구분할 수 있다.
// spark in scala
divisBy2.count()액션을 지정하면 스파크 잡(job)이 시작되고, 스파크 잡은 필터(narrow T.)를 수행한 후 파티션별로 레코드 수를 카운트(wide T.)한다. 이후 각 언어에 적합한 네이티브 객체에 결과를 모은다.
Spark UI
스파크 잡은 개별 액션에 의해 트리거되는 다수의 트랜스포메이션으로 이루어져 있으며 스파크 UI를 통해 모니터링할 수 있다. 스파크 UI는 드라이버 노드의 4040 포트로 접속할 수 있으며, 클러스터에서 실행 중인 스파크 잡의 상태, 환경 설정, 클러스터 상태 등을 확인할 수 있다.
DataFrame Example
스파크는 다양한 데이터소스를 지원한다. 데이터는 SparkSession의 DataFrameReader 클래스를 사용해서 읽으며, 특정 파일 포맷과 옵션을 동시에 설정한다. 스키마 추론(Schema Inference)은 스파크가 데이터를 일부 읽어 스키마 정보를 얻고 스파크 데이터 타입에 맞게 분석하는 기능이다. 운영환경에서는 데이터를 읽는 시점에 엄격한 스키마를 지정하는 옵션을 사용하는 편이 안전하다. 데이터를 읽는 연산도 지연 연산 형태의 트랜스포메이션이기 때문에, 아래의 코드를 실행하여도 DataFrame 로우의 개수를 알 수 없다.
// spark in scala
val flightData2015 = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.csv("{file_name_here}")DataFrame의 take 액션을 호출하면 Dataframe을 로컬의 배열이나 리스트의 형태로 변환하여 반환한다.
// spark in scala
flightData2015.take(3)
Array([United States, Romania, 15], [United States, Croatia, ...정수 타입인 count 칼럼을 기준으로 데이터를 정렬하는 sort (넓은) 트랜스포메이션을 추가하면 이전 DataFrame을 변환한 새로운 DataFrame을 생성하여 반환한다. 트랜스포메이션 결과가 실제 실행되지 않지만, 스파크는 실행 계획을 만들고 검토하여 클러스터에서 처리할 방법을 알아낸다. DataFrame객체에 explain 메서드를 호출하여 DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있다. 실행 계획은 디버깅과 스파크의 실행과정을 이해하는 데에 도움을 준다.
// spark in scala
flightData2015.sort("count").explain()액션을 호출하여 트랜스포메이션 실행 계획을 시작할 수 있다. 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션을 생성하는데, 파티션 수는 직접 지정할 수 있다.
// spark in scala
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)
... Array([United States, Singapore, 1], [Moldova, United States, 1])Last updated