Airflow를 활용한 PostgreSQL에서 Elasticsearch로 데이터 마이그레이션 예제

Page content

강의 홍보

개요

  • Airflow를 활용하여 PostgreSQL에 저장된 데이터를 디스크로 다운로드 받고, 그리고 그 파일을 다시 읽어서 Elasticsearch에 저장하도록 한다.
  • 전체적인 흐름은 getData from PostgreSQL >> insertData to Elasticsearch 로 저장할 수 있다.

전체 코드 실행

  • 우선 전체 코드를 실행하도록 한다.
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch

def queryPostgresql():
    conn_string="dbname='python_dataengineering' host='localhost' user='postgres' password='evan'"
    conn = db.connect(conn_string)
    df = pd.read_sql("select name, city from users", conn)
    df.to_csv("dags/postgresqldata.csv")
    print("----------Data Saved------------")

def insertElasticsearch():
    es=Elasticsearch()
    df = pd.read_csv("dags/postgresqldata.csv")
    print(df.head())
    for i, r in df.iterrows():
        doc = r.to_json()
        res = es.index(index="frompostgresql", doc_type="doc", body=doc)
        print(res)

default_args = {
    'owner': 'evan',
    'start_date': dt.datetime(2021, 9, 15),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('MyDBdag',
         default_args = default_args,
         schedule_interval=timedelta(minutes=5),
         ) as dag:

    print_starting = BashOperator(task_id='starting',
                                  bash_command='echo "I am reading the PostgreSQL now....."')

    getData = PythonOperator(task_id='QueryPostgreSQL',
                             python_callable=queryPostgresql)

    insertData = PythonOperator(task_id="InsertDataElasticsearch",
                                python_callable=insertElasticsearch)

print_starting >> getData >> insertData
(venv) $ airflow webserver
(venv) $ airflow scheduler

결과 확인

  • 브라우저로 [http://localhost:8080](http://localhost:8080) 에 접속한다.
  • MyDBdag 를 실행 후, 클릭한다.

airflow_01.png

  • 그 후 시간이 지나면, 아래와 같이 성공한 것을 확인할 수 있다.

airflow_02.png

  • 정상적으로 처리가 되었는지 키바나를 확인한다.
  • frompostgresql 색인을 추가한다.
  • Discover 탭에서 frompostgresql 을 조회하면 아래와 같이 확인할 수 있다.

airflow_kibana.png

  • 만약 airflow를 계속 실행하면, 계속적으로 elasticsearch에 데이터가 추가되는 것을 확인할 수 있다.
    • Refresh 버튼을 클릭한다.
  • 이 문제를 해결하는 것이 데이터파이프라인의 핵심이며, 각 실행은 idempotent이어야 한다.
  • 이번 예제에서는 이런 부분은 다루지 않았음을 참고한다.

코드 설명 1 - queryPostgresql

  • 코드는 다음과 같다.
def queryPostgresql():
    conn_string="dbname='python_dataengineering' host='localhost' user='postgres' password='evan'"
    conn = db.connect(conn_string)
    df = pd.read_sql("select name, city from users", conn)
    df.to_csv("dags/postgresqldata.csv")
    print("----------Data Saved------------")
  • 사전 학습: 파이썬과 PostgreSQL DB 연동
  • 해당 함수는 PostgreSQL DB에 접속하여 관련 데이터를 가져오는 함수이다.
  • DB에 연결한 후, pandas 모듈을 활용하여 데이터를 가져온다.
  • 그리고, 해당 데이터를 임시적으로 csv 파일을 디스크에 저장했다.

코드 설명 2 - insertElasticsearch

  • 코드는 다음과 같다.
def insertElasticsearch():
    es=Elasticsearch()
    df = pd.read_csv("dags/postgresqldata.csv")
    print(df.head())
    for i, r in df.iterrows():
        doc = r.to_json()
        res = es.index(index="frompostgresql", doc_type="doc", body=doc)
        print(res)
  • 해당 함수는 디스크에 임시 저장된 csv 파일을 불러와서 elasticsearch에 다시 추가하는 코드이다.
  • 각 행을 JSON으로 변환하고, index 메서드를 이용하여 frompostgresql 이라는 이름으로 추가했다.

코드 설명 3 - with DAG()

  • 코드는 다음과 같다.
default_args = {
    'owner': 'evan',
    'start_date': dt.datetime(2021, 9, 15),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('MyDBdag',
         default_args = default_args,
         schedule_interval=timedelta(minutes=5),
         ) as dag:

    print_starting = BashOperator(task_id='starting',
                                  bash_command='echo "I am reading the PostgreSQL now....."')

    getData = PythonOperator(task_id='QueryPostgreSQL',
                             python_callable=queryPostgresql)

    insertData = PythonOperator(task_id="InsertDataElasticsearch",
                                python_callable=insertElasticsearch)

print_starting >> getData >> insertData

References