Airflow를 활용한 PostgreSQL에서 Elasticsearch로 데이터 마이그레이션 예제
Page content
강의 홍보
- 취준생을 위한 강의를 제작하였습니다.
- 본 블로그를 통해서 강의를 수강하신 분은 게시글 제목과 링크를 수강하여 인프런 메시지를 통해 보내주시기를 바랍니다.
스타벅스 아이스 아메리카노를 선물
로 보내드리겠습니다.
- [비전공자 대환영] 제로베이스도 쉽게 입문하는 파이썬 데이터 분석 - 캐글입문기
개요
Airflow
를 활용하여PostgreSQL
에 저장된 데이터를 디스크로 다운로드 받고, 그리고 그 파일을 다시 읽어서Elasticsearch
에 저장하도록 한다.- 전체적인 흐름은
getData from PostgreSQL >> insertData to Elasticsearch
로 저장할 수 있다.
전체 코드 실행
- 우선 전체 코드를 실행하도록 한다.
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch
def queryPostgresql():
conn_string="dbname='python_dataengineering' host='localhost' user='postgres' password='evan'"
conn = db.connect(conn_string)
df = pd.read_sql("select name, city from users", conn)
df.to_csv("dags/postgresqldata.csv")
print("----------Data Saved------------")
def insertElasticsearch():
es=Elasticsearch()
df = pd.read_csv("dags/postgresqldata.csv")
print(df.head())
for i, r in df.iterrows():
doc = r.to_json()
res = es.index(index="frompostgresql", doc_type="doc", body=doc)
print(res)
default_args = {
'owner': 'evan',
'start_date': dt.datetime(2021, 9, 15),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('MyDBdag',
default_args = default_args,
schedule_interval=timedelta(minutes=5),
) as dag:
print_starting = BashOperator(task_id='starting',
bash_command='echo "I am reading the PostgreSQL now....."')
getData = PythonOperator(task_id='QueryPostgreSQL',
python_callable=queryPostgresql)
insertData = PythonOperator(task_id="InsertDataElasticsearch",
python_callable=insertElasticsearch)
print_starting >> getData >> insertData
- 위 함수를 실행하려면
$AIRFLOW_HOME/dags
폴더에 저장해야 하며, 필자는step04_7_airflow.py
형태로 저장했다. - 그리고, 다음과 같이 실행하면 된다. (두개의 터미널이 필요)
(venv) $ airflow webserver
(venv) $ airflow scheduler
결과 확인
- 브라우저로
[http://localhost:8080](http://localhost:8080)
에 접속한다. MyDBdag
를 실행 후, 클릭한다.
- 그 후 시간이 지나면, 아래와 같이 성공한 것을 확인할 수 있다.
- 정상적으로 처리가 되었는지 키바나를 확인한다.
frompostgresql
색인을 추가한다.- 추가하는 방법 예제: 파이썬과 엘라스틱서치 DB 연동
Discover
탭에서frompostgresql
을 조회하면 아래와 같이 확인할 수 있다.
- 만약 airflow를 계속 실행하면, 계속적으로 elasticsearch에 데이터가 추가되는 것을 확인할 수 있다.
- Refresh 버튼을 클릭한다.
- 이 문제를 해결하는 것이 데이터파이프라인의 핵심이며, 각 실행은
idempotent
이어야 한다. - 이번 예제에서는 이런 부분은 다루지 않았음을 참고한다.
코드 설명 1 - queryPostgresql
- 코드는 다음과 같다.
def queryPostgresql():
conn_string="dbname='python_dataengineering' host='localhost' user='postgres' password='evan'"
conn = db.connect(conn_string)
df = pd.read_sql("select name, city from users", conn)
df.to_csv("dags/postgresqldata.csv")
print("----------Data Saved------------")
- 사전 학습: 파이썬과 PostgreSQL DB 연동
- 해당 함수는 PostgreSQL DB에 접속하여 관련 데이터를 가져오는 함수이다.
- DB에 연결한 후, pandas 모듈을 활용하여 데이터를 가져온다.
- 그리고, 해당 데이터를 임시적으로 csv 파일을 디스크에 저장했다.
코드 설명 2 - insertElasticsearch
- 코드는 다음과 같다.
def insertElasticsearch():
es=Elasticsearch()
df = pd.read_csv("dags/postgresqldata.csv")
print(df.head())
for i, r in df.iterrows():
doc = r.to_json()
res = es.index(index="frompostgresql", doc_type="doc", body=doc)
print(res)
- 해당 함수는 디스크에 임시 저장된
csv
파일을 불러와서elasticsearch
에 다시 추가하는 코드이다. - 각 행을 JSON으로 변환하고, index 메서드를 이용하여
frompostgresql
이라는 이름으로 추가했다.
코드 설명 3 - with DAG()
- 코드는 다음과 같다.
default_args = {
'owner': 'evan',
'start_date': dt.datetime(2021, 9, 15),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('MyDBdag',
default_args = default_args,
schedule_interval=timedelta(minutes=5),
) as dag:
print_starting = BashOperator(task_id='starting',
bash_command='echo "I am reading the PostgreSQL now....."')
getData = PythonOperator(task_id='QueryPostgreSQL',
python_callable=queryPostgresql)
insertData = PythonOperator(task_id="InsertDataElasticsearch",
python_callable=insertElasticsearch)
print_starting >> getData >> insertData
- 해당 코드는 실제 작업을 수행하는 함수들을 정의한다.
- 위 함수들의 코드는 Apache Airflow를 활용한 CSV에서 JSON으로 변환하기을 사전에 수행하면 익숙하기 때문에 추가 설명은 생략한다.
References
- Data Engineering with Python by Paul Crickard, https://www.packtpub.com/product/data-engineering-with-python/9781839214189