Apache Airflow를 활용한 CSV에서 JSON으로 변환하기

Page content

강의 홍보

개요

  • Apache Airflow에서 가장 중요한 개념은 DAG(Directed Acyclic Graph)이다.
  • DAG를 만들 시, Bash 스크립트 및 연산자(Operator)로 작업을 정의할 수 있다.
  • 이 때, 파이썬 함수로 조직화 한다.
  • Airflow 설치방법을 모른다면 다음 페이지에서 확인한다.

Step 01. Building a CSV to a JSON data pipeline

  • 우선 전체적인 코드를 확인한다.
  • 필자의 airflow 버전은 2.13이다.
  • 이 때, 몇몇 코드는 아래와 같이 발견될 수도 있다.
from airflow.operators.~~python_operator~~ import PythonOperator
## This module is deprecated. Please use `airflow.operators.python`.
  • 그럴 경우, 해당 명령어대로 수정하기를 바란다.
  • 위 부분이 교재와 코드 일부가 조금 다르며, 그 때마다 수정한다. 따라서, 교재를 보더라도 실제로는 아래 코드를 적용하거나, 아니면 airflow 1.x 버전으로 맞춰서 진행하기를 바란다.
import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd

def csvToJson():
    df = pd.read_csv('dags/data.csv')
    for i, r in df.iterrows():
        print(r['name'])
    df.to_json('dags/fromAirflow.json', orient='records')

default_args = {
    'owner': 'evan',
    'start_date': dt.datetime(2020, 3, 18),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('MyCSVDAG',
         default_args=default_args,
         schedule_interval=timedelta(minutes=5),  # '0 * * * *',
         ) as dag:
    print_starting = BashOperator(task_id='starting',
                                  bash_command='echo "I am reading the CSV now....."')

    csvJson = PythonOperator(task_id='convertCSVtoJson',
                             python_callable=csvToJson)

print_starting >> csvJson
  • 위 코드에서 바꿔야 하는 것은 owner : evan 여기 부분만 변경하시면 된다.

Step 02. 소스코드 실행방법

  • 소스코드를 실행하려면 특정 폴더에 소스 파일과 그리고 csv 파일을 별도 폴더로 지정해줘야 한다.
  • 필자의 경우 파이썬 파일명은 ch02_airflow.py 이다.
  • 우선 Airflow 경로를 확인한다. (여기서 헤매면 계속 헤매이게 된다!)
(venv) $ echo $AIRFLOW_HOME
/Users/evan/Desktop/data_engineering_python/install_files/airflow
  • 이 때, airflow.cfg 파일에서 dags_folder 속성을 확인해본다.
  • 필자는 아래와 같이 지정이 되어 있다.
.
.
dags_folder = /Users/evan/Desktop/data_engineering_python/install_files/airflow/dags
.
.
  • 이 경로의 의미는 ch02_airflow.pydata.csv 파일이 해당 폴더 dags 안에 있어야 한다는 것이다.
  • 만약 dags 폴더가 없다면 새로 만들고, 두개의 파일을 추가하도록 한다.
  • 다시 말하지만, 대부분의 경우가 소스 파일 및 데이터 파일의 경로 문제가 대다수 에러의 원인임을 기억한다.
  • 이제 이 상태에서 아래 코드를 순차적으로 실행한다. 이 때, 터미널을 두개를 열어서 독립적으로 실행한다.
(venv) $ airflow webserver
(venv) $ airflow scheduler
  • 만약 접속이 안되거나, 그 외 대부분의 문제는 AIRFLOW 경로가 제대로 설정이 안된 경우이기 때문에, 그럴 경우에는 무조건 $AIRFLOW_HOME 설정을 다시 하도록 한다.
  • 설정 방법은 아래 코드를 참조한다.
(venv) $ export AIRFLOW_HOME="$(pwd)"
(venv) $ echo $AIRFLOW_HOME

Step 03. GUI에서 확인

  • http://0.0.0.0:8080/ 에 접속한 후, 로그인을 진행한다.
  • 그러면 아래와 같은 화면이 나타나면 정상적으로 실행이 된 것이다.

airflow_01.png

  • MyCSVDAG를 클릭하면 각 실행 일정에 따른 작업 상태를 확인할 수 있다.

airflow_02.png

  • 그리고, dags 폴더에 fromAirflow.json 가 자동으로 생성된 것을 확인할 수 있다.
(venv) :dags $ ls
__pycache__             ch02_airflow.py         data.csv                fromAirflow.json

Step 04. 에러 해결

  • 간혹 경로 에러가 나는 경우 해결 방법은 cfg 파일을 수정해야 한다.
    • 이 부분은 교재에 없는 내용이기 때문에 추가한다.
  • 기존파일은 아래와 같다.
.
.
dags_folder = /Users/evan/Desktop/data_engineering_python/install_files/airflow/dags
.
.
base_log_folder = /Users/evan/Desktop/data_engineering_python/install_files/airflow/logs
  • 변경된 파일은 아래와 같다.
.
.
airflow_home = $AIRFLOW_HOME
dags_folder = $AIRFLOW_HOME/dags
.
.
base_log_folder = $AIRFLOW_HOME/logs

Step 05. 작업 로그 메시지 확인

  • 초록색 사각형은 작업이 종료되었다는 것을 의미한다.

airflow_03.png

  • 해당 정사각형을 클릭 시, 여러 대화 상자가 나타나는데, 이 때, log 버튼을 클릭한다.

airflow_04.png

  • 파이썬 성공적으로 파일이 처리되고 각 이름값이 나오는 로그데이터를 확인할 수 있다.
  • 즉, 정상적으로 수행이 되었다는 뜻이다.

airflow_05.png

Step 06. 소스 코드 설명

  • 우선 라이브러리를 불러온다.
import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd
  • datetime 라이브러리는 DAG의 스케줄링에 쓰인다.
  • airflow 라이브러리는 순서대로, DAG 구축, Bash Operator 작성, Python Operator 작성을 위한 도구들을 제공한다.
  • pandas 라이브러리는 CSV와 JSON 변환 시 필요한 도구들을 제공한다.

  • 이번에는 CSVToJSON() 함수를 작성한다.
    • csv 파일을 읽은 뒤, JSON 형식으로 변환하는 코드이다.
def csvToJson():
    df = pd.read_csv('dags/data.csv')
    for i, r in df.iterrows():
        print(r['name'])
    df.to_json('dags/fromAirflow.json', orient='records')

  • 위 함수를 에어플로 DAG를 위한 하나의 작업으로 등록을 하도록 한다.
  • 아래 내용은 owner(소유자), start_date(시작일시), retries (재시도 횟수), 그리고 retry_delay (재시도 지연 시간)을 지정하였다.
default_args = {
    'owner': 'evan',
    'start_date': dt.datetime(2020, 3, 18),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

  • 다음 코드가 airflow의 핵심 코드라고 보면 된다.
with DAG('MyCSVDAG',
         default_args=default_args,
         schedule_interval=timedelta(minutes=5),  # '0 * * * *',
         ) as dag:
    print_starting = BashOperator(task_id='starting',
                                  bash_command='echo "I am reading the CSV now....."')

    csvJson = PythonOperator(task_id='convertCSVtoJson',
                             python_callable=csvToJson)

print_starting >> csvJson
  • MyCSVDAG 식별자이며, 이는 airflow GUI 화면에 나타난다.
  • default_args 는 사전에 정의한 것을 기입한다.
  • schedule_interval 는 매개변수를 지정한다. 여기에서 timedelta 모듈을 사용했지만, 몇몇 예제에서는 crontab 형식의 문자열을 지정하기도 한다.
  • 각 Operators에 관한 설명은 공식 문서들을 참조한다.
  • print_starting 는 일종의 시작을 알리는 신호이며, 실제 작업은 Python Operator 내에서 작업을 한다.
    • Airflow 작업 시작 —> 파이썬 함수 실행
  • 마지막으로 각 작업들을 정의한 다음에는 작업들 사이의 관계를 설정해야 한다.
    • 비트 자리이동 연산자(»와 «)로 정의할 수 있다.
    • 또는 set_upstream()아니면 set_downstream()으로 사용이 가능하다.

References