Airflow 활용한 DB Insert 예제 (M1, MacOS)
Page content
개요
- MySQL과 PostgreSQL에 각각 테이블 생성 후 데이터 넣기
- 아래와 같이 병렬적으로 실행하는 예제
시나리오
- 테이블과 데이터를 추가하되 두 DB에 대한 접근 방식이 다름
- MySQL은 직접적으로 넣기
- PostgreSQL은 Airflow를 통해서 데이터 넣기
환경설정
- 사전에 MySQL과 PostgreSQL이 설치가 이미 되어 있음을 가정한다.
파이썬 설치
프로젝트 초기화
- 프로젝트 디렉터리에서 다음과 순차적으로 실행
$ uv venv -p 3.11
$ source .venv/bin/activate
Airflow 설치
- 먼저 환경변수를 설정한다.
$ export AIRFLOW_HOME=$(pwd)/airflow
셸 스크립트 작성 및 실행
- 다음과 같이 셸 스크립트 작성
- 파일명 : install_airflow.sh
AIRFLOW_VERSION=2.8.0
# Python 버전을 3.11로 고정 설정
PYTHON_VERSION="3.11"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example this would install 3.0.0 with python 3.11: https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.11.txt
uv pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
uv pip install -r requirements.txt
- 라이브러리 목록
- 파일명 : requirements.txt
pandas
numpy
seaborn
matplotlib
requests
psycopg2-binary
streamlit
plotly
pytz
mysql-connector-python
apache-airflow-providers-postgres==5.7.1
- 셸 스크립트를 실행한다.
chmod +x install_airflow.sh
./install_airflow.sh
MySQL, PostgreSQL 설정확인
MySQL 설정
- 설정
- host : localhost
- user : root
- password : evan1234
- port : 3306
- Schema : airflow_db
PostgreSQL 설정
- 설정
- host : localhost
- user : postgres
- password : 1234
- port : 5432
- Database : python_dataengineering
최종 결과 예시
- 정상적으로 입력이 되면 다음과 같이 출력이 될 것이다. (미리 확인용)
- MySQL
- PostgreSQL
전체 코드
- 파이썬의 코드는 다음과 같다. (파일명 : mysql_postgres_dag.py)
- 이 DAG은 Airflow에서 MySQL과 PostgreSQL을 함께 사용하는 예제
- MySQL에
test_table
테이블을 생성하고 데이터를 삽입 - PostgreSQL에도 동일한 테이블을 생성하고 데이터를 삽입
- 마지막 단계에서 두 데이터베이스 모두에서 데이터를 읽어와 출력
- MySQL 연동은
mysql.connector
, PostgreSQL은 Airflow 내장PostgresHook
를 사용 - 작업은 DAG으로 연결되어 일간 스케줄로 실행되며, 실패 시 재시도 설정이 되어 있음
- MySQL에
- 각 코드에 대한 구체적인 설명은 생략한다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import mysql.connector
import os
# macOS에서 Airflow 네트워크 요청 문제 해결
os.environ['NO_PROXY'] = '*'
# MySQL 연결을 위한 환경 변수 설정
os.environ['AIRFLOW_CONN_MYSQL_DEFAULT'] = 'mysql+pymysql://root:evan1234@localhost/python_dataengineering'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def create_mysql_table(**context):
# MySQL 연결 설정
connection = mysql.connector.connect(
host='localhost',
user='root',
password='evan1234',
database='airflow_db'
)
try:
cursor = connection.cursor()
# 테이블 생성
create_table_sql = """
CREATE TABLE IF NOT EXISTS test_table (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
value INT
);
"""
cursor.execute(create_table_sql)
connection.commit()
finally:
cursor.close()
connection.close()
def insert_mysql_data(**context):
# MySQL 연결 설정
connection = mysql.connector.connect(
host='localhost',
user='root',
password='evan1234',
database='airflow_db'
)
try:
cursor = connection.cursor()
# 데이터 삽입
insert_sql = """
INSERT INTO test_table (name, value) VALUES
('test1', 100),
('test2', 200),
('test3', 300);
"""
cursor.execute(insert_sql)
connection.commit()
finally:
cursor.close()
connection.close()
def process_data(**context):
# Get data from MySQL
mysql_connection = mysql.connector.connect(
host='localhost',
user='root',
password='evan1234',
database='airflow_db'
)
try:
cursor = mysql_connection.cursor()
cursor.execute("SELECT * FROM test_table")
mysql_data = cursor.fetchall()
print("MySQL Data:", mysql_data)
finally:
cursor.close()
mysql_connection.close()
# Get data from PostgreSQL
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
postgres_data = postgres_hook.get_records("SELECT * FROM test_table")
print("PostgreSQL Data:", postgres_data)
with DAG(
'step10_mysql_postgres_example',
default_args=default_args,
description='A DAG demonstrating regular MySQL and PostgreSQL operators',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# MySQL Tasks using PythonOperator
create_mysql_table_task = PythonOperator(
task_id='create_mysql_table',
python_callable=create_mysql_table,
provide_context=True
)
insert_mysql_data_task = PythonOperator(
task_id='insert_mysql_data',
python_callable=insert_mysql_data,
provide_context=True
)
# PostgreSQL Tasks
create_postgres_table = PostgresOperator(
task_id='create_postgres_table',
postgres_conn_id='postgres_default',
sql="""
CREATE TABLE IF NOT EXISTS test_table (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
value INT
);
"""
)
insert_postgres_data = PostgresOperator(
task_id='insert_postgres_data',
postgres_conn_id='postgres_default',
sql="""
INSERT INTO test_table (name, value) VALUES
('test1', 100),
('test2', 200),
('test3', 300);
"""
)
# Process data from both databases
process_data_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
provide_context=True
)
# Define task dependencies
create_mysql_table_task >> insert_mysql_data_task
create_postgres_table >> insert_postgres_data
[insert_mysql_data_task, insert_postgres_data] >> process_data_task
Airflow 실행
- 두개의 터미널이 필요하며 각각 airflow 환경변수가 필요함
- 먼저 하나의 터미널
- 다음과 같이 순차적으로 필요함
$ export AIRFLOW_HOME=$(pwd)/airflow
$ airflow db reset -y
$ airflow db init
$ airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.com \
--password 1234
$ airflow webserver -p 8080
Airflow PostgreSQL 설정
- AirFlow에서 제공하는 메서드 사용
- Airflow 메뉴에서 [Admin] > [Connections] > [Conn ID] > Postgres 관련 영역에서 설정 추가
- 다음과 같이 설정하는 영역에서 필요한 내용을 작성한다.
- MySQL은 여기에서는 하지 않는다.
- 다른 하나의 터미널
$ export AIRFLOW_HOME=$(pwd)/airflow
$ airflow scheduler
최종 확인
- 모두 정상적으로 동작하는 것을 확인할 수 있다.
- 또한 앞에서 봤듯이 MySQL에도 데이터가 정상적으로 들어가는 것을 확인할 수 있다.