Apache Airflow를 활용한 CSV에서 JSON으로 변환하기
Page content
강의 홍보
- 취준생을 위한 강의를 제작하였습니다.
- 본 블로그를 통해서 강의를 수강하신 분은 게시글 제목과 링크를 수강하여 인프런 메시지를 통해 보내주시기를 바랍니다.
스타벅스 아이스 아메리카노를 선물
로 보내드리겠습니다.
- [비전공자 대환영] 제로베이스도 쉽게 입문하는 파이썬 데이터 분석 - 캐글입문기
개요
- Apache Airflow에서 가장 중요한 개념은 DAG(Directed Acyclic Graph)이다.
- DAG를 만들 시, Bash 스크립트 및 연산자(Operator)로 작업을 정의할 수 있다.
- 이 때, 파이썬 함수로 조직화 한다.
- Airflow 설치방법을 모른다면 다음 페이지에서 확인한다.
Step 01. Building a CSV to a JSON data pipeline
- 우선 전체적인 코드를 확인한다.
- 필자의 airflow 버전은 2.13이다.
- 이 때, 몇몇 코드는 아래와 같이 발견될 수도 있다.
from airflow.operators.~~python_operator~~ import PythonOperator
## This module is deprecated. Please use `airflow.operators.python`.
- 그럴 경우, 해당 명령어대로 수정하기를 바란다.
- 위 부분이 교재와 코드 일부가 조금 다르며, 그 때마다 수정한다. 따라서, 교재를 보더라도 실제로는 아래 코드를 적용하거나, 아니면 airflow 1.x 버전으로 맞춰서 진행하기를 바란다.
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
def csvToJson():
df = pd.read_csv('dags/data.csv')
for i, r in df.iterrows():
print(r['name'])
df.to_json('dags/fromAirflow.json', orient='records')
default_args = {
'owner': 'evan',
'start_date': dt.datetime(2020, 3, 18),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('MyCSVDAG',
default_args=default_args,
schedule_interval=timedelta(minutes=5), # '0 * * * *',
) as dag:
print_starting = BashOperator(task_id='starting',
bash_command='echo "I am reading the CSV now....."')
csvJson = PythonOperator(task_id='convertCSVtoJson',
python_callable=csvToJson)
print_starting >> csvJson
- 위 코드에서 바꿔야 하는 것은
owner
:evan
여기 부분만 변경하시면 된다.
Step 02. 소스코드 실행방법
- 소스코드를 실행하려면 특정 폴더에 소스 파일과 그리고 csv 파일을 별도 폴더로 지정해줘야 한다.
- 필자의 경우 파이썬 파일명은
ch02_airflow.py
이다. - 우선 Airflow 경로를 확인한다. (여기서 헤매면 계속 헤매이게 된다!)
(venv) $ echo $AIRFLOW_HOME
/Users/evan/Desktop/data_engineering_python/install_files/airflow
- 이 때,
airflow.cfg
파일에서dags_folder
속성을 확인해본다. - 필자는 아래와 같이 지정이 되어 있다.
.
.
dags_folder = /Users/evan/Desktop/data_engineering_python/install_files/airflow/dags
.
.
- 이 경로의 의미는
ch02_airflow.py
와data.csv
파일이 해당 폴더dags
안에 있어야 한다는 것이다. - 만약
dags
폴더가 없다면 새로 만들고, 두개의 파일을 추가하도록 한다. - 다시 말하지만, 대부분의 경우가 소스 파일 및 데이터 파일의 경로 문제가 대다수 에러의 원인임을 기억한다.
- 이제 이 상태에서 아래 코드를 순차적으로 실행한다. 이 때, 터미널을 두개를 열어서 독립적으로 실행한다.
(venv) $ airflow webserver
(venv) $ airflow scheduler
- 만약 접속이 안되거나, 그 외 대부분의 문제는 AIRFLOW 경로가 제대로 설정이 안된 경우이기 때문에, 그럴 경우에는 무조건
$AIRFLOW_HOME
설정을 다시 하도록 한다. - 설정 방법은 아래 코드를 참조한다.
(venv) $ export AIRFLOW_HOME="$(pwd)"
(venv) $ echo $AIRFLOW_HOME
Step 03. GUI에서 확인
http://0.0.0.0:8080/
에 접속한 후, 로그인을 진행한다.- 그러면 아래와 같은 화면이 나타나면 정상적으로 실행이 된 것이다.
- MyCSVDAG를 클릭하면 각 실행 일정에 따른 작업 상태를 확인할 수 있다.
- 그리고, dags 폴더에
fromAirflow.json
가 자동으로 생성된 것을 확인할 수 있다.
(venv) :dags $ ls
__pycache__ ch02_airflow.py data.csv fromAirflow.json
Step 04. 에러 해결
- 간혹 경로 에러가 나는 경우 해결 방법은
cfg
파일을 수정해야 한다.- 이 부분은 교재에 없는 내용이기 때문에 추가한다.
- 기존파일은 아래와 같다.
.
.
dags_folder = /Users/evan/Desktop/data_engineering_python/install_files/airflow/dags
.
.
base_log_folder = /Users/evan/Desktop/data_engineering_python/install_files/airflow/logs
- 변경된 파일은 아래와 같다.
.
.
airflow_home = $AIRFLOW_HOME
dags_folder = $AIRFLOW_HOME/dags
.
.
base_log_folder = $AIRFLOW_HOME/logs
Step 05. 작업 로그 메시지 확인
- 초록색 사각형은 작업이 종료되었다는 것을 의미한다.
- 해당 정사각형을 클릭 시, 여러 대화 상자가 나타나는데, 이 때,
log
버튼을 클릭한다.
- 파이썬 성공적으로 파일이 처리되고 각 이름값이 나오는 로그데이터를 확인할 수 있다.
- 즉, 정상적으로 수행이 되었다는 뜻이다.
Step 06. 소스 코드 설명
- 우선 라이브러리를 불러온다.
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
datetime
라이브러리는 DAG의 스케줄링에 쓰인다.airflow
라이브러리는 순서대로, DAG 구축, Bash Operator 작성, Python Operator 작성을 위한 도구들을 제공한다.pandas
라이브러리는 CSV와 JSON 변환 시 필요한 도구들을 제공한다.
- 이번에는
CSVToJSON()
함수를 작성한다.- csv 파일을 읽은 뒤, JSON 형식으로 변환하는 코드이다.
def csvToJson():
df = pd.read_csv('dags/data.csv')
for i, r in df.iterrows():
print(r['name'])
df.to_json('dags/fromAirflow.json', orient='records')
- 위 함수를 에어플로 DAG를 위한 하나의 작업으로 등록을 하도록 한다.
- 아래 내용은
owner
(소유자),start_date
(시작일시),retries
(재시도 횟수), 그리고retry_delay
(재시도 지연 시간)을 지정하였다.
default_args = {
'owner': 'evan',
'start_date': dt.datetime(2020, 3, 18),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
- 다음 코드가
airflow
의 핵심 코드라고 보면 된다.
with DAG('MyCSVDAG',
default_args=default_args,
schedule_interval=timedelta(minutes=5), # '0 * * * *',
) as dag:
print_starting = BashOperator(task_id='starting',
bash_command='echo "I am reading the CSV now....."')
csvJson = PythonOperator(task_id='convertCSVtoJson',
python_callable=csvToJson)
print_starting >> csvJson
MyCSVDAG
식별자이며, 이는 airflow GUI 화면에 나타난다.default_args
는 사전에 정의한 것을 기입한다.schedule_interval
는 매개변수를 지정한다. 여기에서timedelta
모듈을 사용했지만, 몇몇 예제에서는crontab 형식의 문자열을 지정
하기도 한다.- crontab 형식의 문자열을 지정하는 방법은 https://crontab.guru/ 사이트에서 확인해본다. 또는, https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html 를 참고한다.
- start_date의 실행 시점은 start_date + schedule_interval에 의해 결정한다. 만약, start_date의 값이 오늘이고, schedule_interval은 daily(매일)로 지정했다면, DAG는 내일 실행이 된다.
- 각 Operators에 관한 설명은 공식 문서들을 참조한다.
- print_starting 는 일종의 시작을 알리는 신호이며, 실제 작업은 Python Operator 내에서 작업을 한다.
- Airflow 작업 시작 —> 파이썬 함수 실행
- 마지막으로 각 작업들을 정의한 다음에는 작업들 사이의 관계를 설정해야 한다.
- 비트 자리이동 연산자(»와 «)로 정의할 수 있다.
- 또는 set_upstream()아니면 set_downstream()으로 사용이 가능하다.
References
- Data Engineering with Python by Paul Crickard, https://www.packtpub.com/product/data-engineering-with-python/9781839214189