Spark Code 실행 예제

Page content

강의소개

  • 인프런에서 Streamlit 관련 강의를 진행하고 있습니다.

개요

  • 현재 러닝 스파크 교재를 배우고 있다.
  • 해당 교재는 주로 00.py에서 실행하는 방법으로 안내하고 있지만, Google Colab에서 어떻게 변환하는지 확인해보고자 한다.

Spark 설정

Untitled

  • Download 버튼을 클릭하면 아래와 같은 화면이 나온다.
    • 주소를 복사한다. https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

Untitled

Java 설치

  • 아래 코드를 실행한다.
!apt-get install openjdk-8-jdk-headless

Spark 설치

  • 아래 코드를 실행한다.
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -zxf spark-3.5.1-bin-hadoop3.tgz
  • 압축 파일을 풀었을 때 해당되는 폴더가 있는지 확인한다.
!ls
# 결과 확인 sample_data  spark-3.5.1-bin-hadoop3  spark-3.5.1-bin-hadoop3.tgz

환경변수 설정

  • 버전에 따라 해당 문자열은 변경할 수 있다.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

PySpark 설치

  • PySpark를 설치하기 위해 아래 코드를 순차적으로 실행한다.
!pip install findspark -q
import findspark
findspark.init()
import pyspark
spark_version = pyspark.__version__
print("Apache Spark 버전 확인: " + spark_version)
# Apache Spark 버전 확인: 3.5.1

Spark 테스트

  • 먼저 해당 소스코드와 파일은 github에서 다운로드 받는다.
  • 여기에서 mnmcount.py 코드를 예시로 든다.
  • 먼저 mnmcount.py를 실행하려면 데이터셋을 확인한다.

Untitled

  • 그 후에 mnm_dataset.csv 파일을 업로드를 한다.

기존 소스코드

  • 기존 코드는 아래와 같다.
from __future__ import print_function

import sys

from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: mnmcount <file>", file=sys.stderr)
        sys.exit(-1)

    spark = (SparkSession
        .builder
        .appName("PythonMnMCount")
        .getOrCreate())
    # get the M&M data set file name
    mnm_file = sys.argv[1]
    # read the file into a Spark DataFrame
    mnm_df = (spark.read.format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(mnm_file))
    mnm_df.show(n=5, truncate=False)

    # aggregate count of all colors and groupBy state and color
    # orderBy descending order
    count_mnm_df = (mnm_df.select("State", "Color", "Count")
                    .groupBy("State", "Color")
                    .sum("Count")
                    .orderBy("sum(Count)", ascending=False))

    # show all the resulting aggregation for all the dates and colors
    count_mnm_df.show(n=60, truncate=False)
    print("Total Rows = %d" % (count_mnm_df.count()))

    # find the aggregate count for California by filtering
    ca_count_mnm_df = (mnm_df.select("*")
                       .where(mnm_df.State == 'CA')
                       .groupBy("State", "Color")
                       .sum("Count")
                       .orderBy("sum(Count)", ascending=False))

    # show the resulting aggregation for California
    ca_count_mnm_df.show(n=10, truncate=False)
  • 수정된 소스코드
    • 기존 코드에서 if 문법은 제외했다.
    • from __future ~ 문법도 제외했다.
    • mnm_file = sys.argv[1] 도 제외했다.
import sys

from pyspark.sql import SparkSession

mnm_file = "mnm_dataset.csv"

spark = (SparkSession
    .builder
    .appName("PythonMnMCount")
    .getOrCreate())
# get the M&M data set file name
# mnm_file = sys.argv[1]

# read the file into a Spark DataFrame
mnm_df = (spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(mnm_file))
mnm_df.show(n=5, truncate=False)

# aggregate count of all colors and groupBy state and color
# orderBy descending order
count_mnm_df = (mnm_df.select("State", "Color", "Count")
                .groupBy("State", "Color")
                .sum("Count")
                .orderBy("sum(Count)", ascending=False))

# show all the resulting aggregation for all the dates and colors
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))

# find the aggregate count for California by filtering
ca_count_mnm_df = (mnm_df.select("*")
                    .where(mnm_df.State == 'CA')
                    .groupBy("State", "Color")
                    .sum("Count")
                    .orderBy("sum(Count)", ascending=False))

# show the resulting aggregation for California
ca_count_mnm_df.show(n=10, truncate=False)
  • 코드 실행 결과가 아래와 같이 나오면 된다.

Untitled