Airflow를 활용한 Data Cleansing 예제

Page content

강의 홍보

개요

Raw 데이터 확인

  • 간단하게 Raw 데이터를 확인해보도록 한다.
import pandas as pd

df = pd.read_csv("data/scooter.csv", index_col=0)
print(df)
(venv) $ python3 step00_raw_df.py 
       trip_id  region_id  vehicle_id  ...                                  end_location_name   user_id trip_ledger_id
month                                  ...                                                                            
May    1613335        202     9424537  ...       1899 Roma Ave NE, Albuquerque, NM 87106, USA   8417864        1488546
May    1613639        202     9424537  ...    1111 Stanford Dr NE, Albuquerque, NM 87106, USA   8417864        1488838
May    1613708        202     9424537  ...  1 Domenici Center en Domenici Center, Albuquer...   8417864        1488851
May    1613867        202     9424537  ...  725 University Blvd SE, Albuquerque, NM 87106,...   8417864        1489064
May    1636714        202     8926493  ...          401 2nd St NW, Albuquerque, NM 87102, USA  35436274        1511212
...        ...        ...         ...  ...                                                ...       ...            ...
July   2482235        202     2893981  ...         1418 4th St NW, Albuquerque, NM 87102, USA  42559731        2340035
July   2482254        202     8201542  ...   302 San Felipe St NW, Albuquerque, NM 87104, USA  42457674        2339885
July   2482257        202     5136810  ...    3339 Central Ave NE, Albuquerque, NM 87106, USA  42576631        2342126
July   2482275        202     3125962  ...         1413 4th St SW, Albuquerque, NM 87102, USA  42575656        2340036
July   2482335        202     9449822  ...    3339 Central Ave NE, Albuquerque, NM 87106, USA  42586810        2342161

[34226 rows x 10 columns]

정제 코드 테스트

  • Airflow에 테스트를 하기 전, 실제 잘 작동하는지 확인하는 코드를 작업해본다.
  • 먼저 함수, cleanScooter() 함수에 대해 살펴본다.
    • 먼저 데이터셋의 컬러명은 모두 소문자로 변경한다.
    • 그 이후, 날짜 데이터를 포맷에 맞춰 변경한다.
    • 그리고, cleanscooter.csv 데이터로 내보내기를 진행했다.
  • 두번째 함수, filterData() 함수에 대해 살펴본다.
    • 데이터를 불러온뒤, 각 날짜에 조건문을 적용한 뒤, 데이터를 내보내기 하였다.
  • 마지막 함수는 최종 결과물을 콘솔창에 보여주기 위한 것으로 작업했다.
import pandas as pd

def cleanScooter(DATA_PATH):
    df = pd.read_csv(DATA_PATH + "scooter.csv", index_col=0)
    df.drop(columns=['region_id'], inplace=True)
    df.columns=[x.lower() for x in df.columns]
    df['started_at'] = pd.to_datetime(df['started_at'], format='%m/%d/%Y %H:%M')
    df.to_csv(DATA_PATH + "cleanscooter.csv")

def filterData(DATA_PATH):
    df = pd.read_csv(DATA_PATH + "cleanscooter.csv", index_col=0)
    from_date = '2019-05-23'
    to_date = '2019-06-03'
    to_from = df[(df['started_at'] > from_date) & (df['started_at'] < to_date)]
    to_from.to_csv(DATA_PATH + "may23-june3.csv")

def check_df(DATA_PATH):
    df = pd.read_csv(DATA_PATH + "may23-june3.csv")
    print(df)

if __name__ == "__main__":
    DATA_PATH = "your_path/data/"
    cleanScooter(DATA_PATH)
    filterData(DATA_PATH)
    check_df(DATA_PATH)
  • 위 파일을 실행하면 다음과 같은 결괏값이 나올 것이다.
(venv) $ python3 step01_clean_df.py
     month  trip_id  vehicle_id  ...                                  end_location_name   user_id trip_ledger_id
0      May  1636714     8926493  ...          401 2nd St NW, Albuquerque, NM 87102, USA  35436274        1511212
1      May  1636780     3902020  ...   3217 Pershing Ave SE, Albuquerque, NM 87106, USA  34352757        1511371
2      May  1636856     5192526  ...      809 Copper Ave NW, Albuquerque, NM 87102, USA  35466666        1511483
3      May  1636912     3902020  ...    802 Wellesley Dr SE, Albuquerque, NM 87106, USA  34352757        1511390
4      May  1637035     5192526  ...      809 Copper Ave NW, Albuquerque, NM 87102, USA  35466666        1511516
...    ...      ...         ...  ...                                                ...       ...            ...
6187  June  1772189     7992415  ...          200 3rd St NW, Albuquerque, NM 87102, USA  35858913        1642397
6188  June  1772212     5216813  ...           6th @ Silver, Albuquerque, NM 87102, USA  35795570        1642192
6189  June  1772216     5637480  ...          413 2nd St SW, Albuquerque, NM 87102, USA  37255146        1642781
6190  June  1772229     2739536  ...   101 Broadway Blvd NE, Albuquerque, NM 87102, USA  38038151        1643073
6191  June  1772254     6016871  ...  1st St NW @, Central Ave NW, Albuquerque, NM 8...  37761291        1642284

[6192 rows x 10 columns]
  • 약 3만개의 레코드의 행 갯수가 6000개로 줄어든 것을 확인할 수 있다.

Airflow Code

  • 이제 airflow 에서 작업을 해보도록 한다.
  • 파일명은 step05_01_clean_df.py 로 명명했다.
import pandas as pd
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

def cleanScooter():
    df = pd.read_csv("dags/data/scooter.csv", index_col=0)
    df.drop(columns=['region_id'], inplace=True)
    df.columns=[x.lower() for x in df.columns]
    df['started_at'] = pd.to_datetime(df['started_at'], format='%m/%d/%Y %H:%M')
    df.to_csv("dags/data/cleanscooter.csv")

def filterData():
    df = pd.read_csv("dags/data/cleanscooter.csv", index_col=0)
    from_date = '2019-05-23'
    to_date = '2019-06-03'
    to_from = df[(df['started_at'] > from_date) & (df['started_at'] < to_date)]
    to_from.to_csv("dags/data/may23-june3.csv")

default_args = {
    'owner': 'evan',
    'start_date': dt.datetime(2021, 9, 15),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('Cleaning_Data',
         default_args = default_args,
         schedule_interval=timedelta(minutes=5),
         ) as dag:

    print_starting = BashOperator(task_id='starting',
                                  bash_command='echo "I am reading the PostgreSQL now....."')

    cleanData = PythonOperator(task_id="cleaning_df",
                               python_callable=cleanScooter)

    selectData = PythonOperator(task_id="Filtering_df",
                                python_callable=filterData)

    copyFile = BashOperator(task_id="copy_df",
                            bash_command='cp $AIRFLOW_HOME/dags/data/may23-june3.csv /your_path/Desktop/')

print_starting >> cleanData >> selectData >> copyFile
  • 기존에 작업한 코드와 크게 달라진 것은 없기 때문에, 전체 코드는 생략한다.
  • dags 안에 data 폴더를 만들어서 scooter.csv 만 넣었다. 실행 후 어떻게 새로운 데이터가 추가가 되었는지 확인해본다.
  • 마지막 코드는 최종 결과물을 복사하여 바탕화면에 붙여넣기 하는 코드를 말한다.
(venv) $ ls
scooter.csv
  • DAG가 완성되었으면, 해당 코드를 $AIRFLOW_HOME/dags 폴더에 복사하고, 다음 명령들로 웹서버와 스케줄러를 실행한다.
(venv) airflow:evan $ airflow webserver
(venv) airflow:evan $ airflow scheduler

실행결과 확인

  • 우선 웹 브라우저로 [http://localhost:8080](http://localhost:8080) 에 접속 후, 정상적으로 작업이 되는지 확인한다.

clean_df.png

  • 마지막으로 Data 폴더를 확인해본다.
(venv) your_path:data (evan)$ ls
cleanscooter.csv        may23-june3.csv         scooter.csv

References