Spark Code 실행 예제
Page content
강의소개
- 인프런에서 Streamlit 관련 강의를 진행하고 있습니다.
- 인프런 : https://inf.run/YPniH
개요
- 현재 러닝 스파크 교재를 배우고 있다.
- 해당 교재는 주로 00.py에서 실행하는 방법으로 안내하고 있지만, Google Colab에서 어떻게 변환하는지 확인해보고자 한다.
Spark 설정
- Spark 설치 버전은 아래 링크에서 확인한다.
- Download 버튼을 클릭하면 아래와 같은 화면이 나온다.
- 주소를 복사한다.
https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
- 주소를 복사한다.
Java 설치
- 아래 코드를 실행한다.
!apt-get install openjdk-8-jdk-headless
Spark 설치
- 아래 코드를 실행한다.
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -zxf spark-3.5.1-bin-hadoop3.tgz
- 압축 파일을 풀었을 때 해당되는 폴더가 있는지 확인한다.
!ls
# 결과 확인 sample_data spark-3.5.1-bin-hadoop3 spark-3.5.1-bin-hadoop3.tgz
환경변수 설정
- 버전에 따라 해당 문자열은 변경할 수 있다.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"
PySpark 설치
- PySpark를 설치하기 위해 아래 코드를 순차적으로 실행한다.
!pip install findspark -q
import findspark
findspark.init()
import pyspark
spark_version = pyspark.__version__
print("Apache Spark 버전 확인: " + spark_version)
# Apache Spark 버전 확인: 3.5.1
Spark 테스트
- 먼저 해당 소스코드와 파일은 github에서 다운로드 받는다.
- 여기에서 mnmcount.py 코드를 예시로 든다.
- 먼저 mnmcount.py를 실행하려면 데이터셋을 확인한다.
- 그 후에 mnm_dataset.csv 파일을 업로드를 한다.
기존 소스코드
- 기존 코드는 아래와 같다.
from __future__ import print_function
import sys
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: mnmcount <file>", file=sys.stderr)
sys.exit(-1)
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# get the M&M data set file name
mnm_file = sys.argv[1]
# read the file into a Spark DataFrame
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnm_file))
mnm_df.show(n=5, truncate=False)
# aggregate count of all colors and groupBy state and color
# orderBy descending order
count_mnm_df = (mnm_df.select("State", "Color", "Count")
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# show all the resulting aggregation for all the dates and colors
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))
# find the aggregate count for California by filtering
ca_count_mnm_df = (mnm_df.select("*")
.where(mnm_df.State == 'CA')
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# show the resulting aggregation for California
ca_count_mnm_df.show(n=10, truncate=False)
- 수정된 소스코드
- 기존 코드에서 if 문법은 제외했다.
- from __future ~ 문법도 제외했다.
mnm_file = sys.argv[1]
도 제외했다.
import sys
from pyspark.sql import SparkSession
mnm_file = "mnm_dataset.csv"
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# get the M&M data set file name
# mnm_file = sys.argv[1]
# read the file into a Spark DataFrame
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnm_file))
mnm_df.show(n=5, truncate=False)
# aggregate count of all colors and groupBy state and color
# orderBy descending order
count_mnm_df = (mnm_df.select("State", "Color", "Count")
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# show all the resulting aggregation for all the dates and colors
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))
# find the aggregate count for California by filtering
ca_count_mnm_df = (mnm_df.select("*")
.where(mnm_df.State == 'CA')
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# show the resulting aggregation for California
ca_count_mnm_df.show(n=10, truncate=False)
- 코드 실행 결과가 아래와 같이 나오면 된다.