Airflow 데이터 파이프라인 구축 예제

Page content

개요

  • 이번에는 CSV-JSON으로 데이터를 변환하는 파이프라인을 구축하도록 한다.

Step 01. Dags 폴더 생성

  • 프로젝트 Root 하단에 Dags 폴더를 만든다.
    • dags 폴더를 확인한다.
$ ls
airflow.cfg  airflow.db  dags  logs  venv  webserver_config.py

Step 02. 가상의 데이터 생성

  • 이번 테스트에서 사용할 라이브러리가 없다면 우선 설치한다.
$ pip3 install faker pandas
  • faker 라이브러리를 활용하여 가상의 데이터를 생성한다. (파일 경로 : data/step01_writecsv.py)
from faker import Faker
import csv
output=open('data.csv','w')
fake=Faker()
header=['name','age','street','city','state','zip','lng','lat']
mywriter=csv.writer(output)
mywriter.writerow(header)
for r in range(1000):
    mywriter.writerow([fake.name(),fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(),fake.state(),fake.zipcode(),fake.longitude(),fake.latitude()])
output.close()
  • 생성된 후, 파일을 확인하도록 한다.
evan@evan:/mnt/c/airflow-test/data$ ls
data.csv  step01_writecsv.py

Step 03. csv2json 파일 구축

  • 이번에는 CSV와 JSON 변환 파일을 구축하는 코드를 작성한다. (파일 경로 : dags/csv2json.py)\
  • 주요 목적 함수 csvToJson()의 역할은 data/data.csv 파일을 불러와서 fromAirflow.json 파일로 변경하는 것이다.
  • DAG는 csvToJson 함수를 하나의 작업으로 등록하는 과정을 담는다. 작업의 소유자, 시작일시, 실패 시 재시도 횟수, 재시도 지연시 시간을 지정한다.
  • print_starting >> csvJson 에서 >> 는 하류 설정 연산자라고 부른다. (동의어 비트 자리이동 연산자)
import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd

def csvToJson():
    df=pd.read_csv('data/data.csv')
    for i,r in df.iterrows():
        print(r['name'])
    df.to_json('fromAirflow.json',orient='records')

default_args = {
    'owner': 'evan',
    'start_date': dt.datetime(2020, 3, 18),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('MyCSVDAG',
         default_args=default_args,
         schedule_interval=timedelta(minutes=5),      # '0 * * * *',
         ) as dag:

    print_starting = BashOperator(task_id='starting',
                               bash_command='echo "I am reading the CSV now....."')
    
    csvJson = PythonOperator(task_id='convertCSVtoJson',
                                 python_callable=csvToJson)

print_starting >> csvJson

Step 04. Airflow Webserver 및 Scheduler 동시 실행

  • 이제 웹서버와 스케쥴러를 동시에 실행한다. (터미널을 2개 열어야 함에 주의한다.)
$ airflow webserver -p 8080
$ airflow scheduler
  • 이제 WebUI를 확인하면 정상적으로 작동하는 것을 확인할 수 있다.

Untitled

  • 완료된 작업의 정사각형을 클릭한 뒤, View Log 버튼을 클릭한다.

Untitled

  • Log 버튼을 클릭하면 아래와 같이 메시지가 나오면 정상적으로 작업이 완료가 된 것이다.

Untitled

Step 05. 작업 결과물 확인

  • 최초 목적인 fromAirflow.json 로 정상적으로 변환되었는지 확인하도록 한다.
    • fromAirflow.json 파일이 확인된다면, 정상적으로 작업이 끝난 것이다.
evan@evan:/mnt/c/airflow-test$ ls
airflow-webserver.pid  airflow.cfg  airflow.db  dags  data  fromAirflow.json  logs  venv  webserver_config.py

References