Airflow 데이터 파이프라인 구축 예제
Page content
개요
- 이번에는 CSV-JSON으로 데이터를 변환하는 파이프라인을 구축하도록 한다.
Step 01. Dags 폴더 생성
- 프로젝트 Root 하단에 Dags 폴더를 만든다.
- dags 폴더를 확인한다.
$ ls
airflow.cfg airflow.db dags logs venv webserver_config.py
Step 02. 가상의 데이터 생성
- 이번 테스트에서 사용할 라이브러리가 없다면 우선 설치한다.
$ pip3 install faker pandas
- faker 라이브러리를 활용하여 가상의 데이터를 생성한다. (파일 경로 : data/step01_writecsv.py)
from faker import Faker
import csv
output=open('data.csv','w')
fake=Faker()
header=['name','age','street','city','state','zip','lng','lat']
mywriter=csv.writer(output)
mywriter.writerow(header)
for r in range(1000):
mywriter.writerow([fake.name(),fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(),fake.state(),fake.zipcode(),fake.longitude(),fake.latitude()])
output.close()
- 생성된 후, 파일을 확인하도록 한다.
evan@evan:/mnt/c/airflow-test/data$ ls
data.csv step01_writecsv.py
Step 03. csv2json 파일 구축
- 이번에는 CSV와 JSON 변환 파일을 구축하는 코드를 작성한다. (파일 경로 : dags/csv2json.py)\
- 주요 목적 함수 csvToJson()의 역할은
data/data.csv
파일을 불러와서fromAirflow.json
파일로 변경하는 것이다. - DAG는 csvToJson 함수를 하나의 작업으로 등록하는 과정을 담는다. 작업의 소유자, 시작일시, 실패 시 재시도 횟수, 재시도 지연시 시간을 지정한다.
- 자세한 옵션은 도움말을 참조한다. https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
print_starting >> csvJson
에서>>
는 하류 설정 연산자라고 부른다. (동의어 비트 자리이동 연산자)
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
def csvToJson():
df=pd.read_csv('data/data.csv')
for i,r in df.iterrows():
print(r['name'])
df.to_json('fromAirflow.json',orient='records')
default_args = {
'owner': 'evan',
'start_date': dt.datetime(2020, 3, 18),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('MyCSVDAG',
default_args=default_args,
schedule_interval=timedelta(minutes=5), # '0 * * * *',
) as dag:
print_starting = BashOperator(task_id='starting',
bash_command='echo "I am reading the CSV now....."')
csvJson = PythonOperator(task_id='convertCSVtoJson',
python_callable=csvToJson)
print_starting >> csvJson
Step 04. Airflow Webserver 및 Scheduler 동시 실행
- 이제 웹서버와 스케쥴러를 동시에 실행한다. (터미널을 2개 열어야 함에 주의한다.)
$ airflow webserver -p 8080
$ airflow scheduler
- 이제 WebUI를 확인하면 정상적으로 작동하는 것을 확인할 수 있다.
- 완료된 작업의 정사각형을 클릭한 뒤, View Log 버튼을 클릭한다.
- Log 버튼을 클릭하면 아래와 같이 메시지가 나오면 정상적으로 작업이 완료가 된 것이다.
Step 05. 작업 결과물 확인
- 최초 목적인
fromAirflow.json
로 정상적으로 변환되었는지 확인하도록 한다.fromAirflow.json
파일이 확인된다면, 정상적으로 작업이 끝난 것이다.
evan@evan:/mnt/c/airflow-test$ ls
airflow-webserver.pid airflow.cfg airflow.db dags data fromAirflow.json logs venv webserver_config.py
References
- Paul Crickard, Data Engineering with Python