Airflow 활용한 DB Insert 예제 (M1, MacOS)

Page content

개요

  • MySQL과 PostgreSQL에 각각 테이블 생성 후 데이터 넣기
  • 아래와 같이 병렬적으로 실행하는 예제

Screenshot 2025-05-03 at 12.40.31 PM.png

시나리오

  • 테이블과 데이터를 추가하되 두 DB에 대한 접근 방식이 다름
    • MySQL은 직접적으로 넣기
    • PostgreSQL은 Airflow를 통해서 데이터 넣기

환경설정

  • 사전에 MySQL과 PostgreSQL이 설치가 이미 되어 있음을 가정한다.

파이썬 설치

프로젝트 초기화

  • 프로젝트 디렉터리에서 다음과 순차적으로 실행
$ uv venv -p 3.11
$ source .venv/bin/activate

Airflow 설치

  • 먼저 환경변수를 설정한다.
$ export AIRFLOW_HOME=$(pwd)/airflow

셸 스크립트 작성 및 실행

  • 다음과 같이 셸 스크립트 작성
    • 파일명 : install_airflow.sh
AIRFLOW_VERSION=2.8.0

# Python 버전을 3.11로 고정 설정
PYTHON_VERSION="3.11"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example this would install 3.0.0 with python 3.11: https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.11.txt

uv pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
uv pip install -r requirements.txt
  • 라이브러리 목록
    • 파일명 : requirements.txt
pandas
numpy
seaborn
matplotlib
requests
psycopg2-binary
streamlit
plotly
pytz
mysql-connector-python
apache-airflow-providers-postgres==5.7.1
  • 셸 스크립트를 실행한다.
chmod +x install_airflow.sh
./install_airflow.sh

MySQL, PostgreSQL 설정확인

MySQL 설정

  • 설정
    • host : localhost
    • user : root
    • password : evan1234
    • port : 3306
    • Schema : airflow_db

PostgreSQL 설정

  • 설정
    • host : localhost
    • user : postgres
    • password : 1234
    • port : 5432
    • Database : python_dataengineering

최종 결과 예시

  • 정상적으로 입력이 되면 다음과 같이 출력이 될 것이다. (미리 확인용)
  • MySQL

Screenshot 2025-05-03 at 1.43.48 PM.png

  • PostgreSQL

Screenshot 2025-05-03 at 1.42.57 PM.png

전체 코드

  • 파이썬의 코드는 다음과 같다. (파일명 : mysql_postgres_dag.py)
  • 이 DAG은 Airflow에서 MySQL과 PostgreSQL을 함께 사용하는 예제
    1. MySQL에 test_table 테이블을 생성하고 데이터를 삽입
    2. PostgreSQL에도 동일한 테이블을 생성하고 데이터를 삽입
    3. 마지막 단계에서 두 데이터베이스 모두에서 데이터를 읽어와 출력
    4. MySQL 연동은 mysql.connector, PostgreSQL은 Airflow 내장 PostgresHook를 사용
    5. 작업은 DAG으로 연결되어 일간 스케줄로 실행되며, 실패 시 재시도 설정이 되어 있음
  • 각 코드에 대한 구체적인 설명은 생략한다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import mysql.connector
import os

# macOS에서 Airflow 네트워크 요청 문제 해결
os.environ['NO_PROXY'] = '*'

# MySQL 연결을 위한 환경 변수 설정
os.environ['AIRFLOW_CONN_MYSQL_DEFAULT'] = 'mysql+pymysql://root:evan1234@localhost/python_dataengineering'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def create_mysql_table(**context):
    # MySQL 연결 설정
    connection = mysql.connector.connect(
        host='localhost',
        user='root',
        password='evan1234',
        database='airflow_db'
    )
    
    try:
        cursor = connection.cursor()
        # 테이블 생성
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS test_table (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100),
            value INT
        );
        """
        cursor.execute(create_table_sql)
        connection.commit()
    finally:
        cursor.close()
        connection.close()

def insert_mysql_data(**context):
    # MySQL 연결 설정
    connection = mysql.connector.connect(
        host='localhost',
        user='root',
        password='evan1234',
        database='airflow_db'
    )
    
    try:
        cursor = connection.cursor()
        # 데이터 삽입
        insert_sql = """
        INSERT INTO test_table (name, value) VALUES
        ('test1', 100),
        ('test2', 200),
        ('test3', 300);
        """
        cursor.execute(insert_sql)
        connection.commit()
    finally:
        cursor.close()
        connection.close()

def process_data(**context):
    # Get data from MySQL
    mysql_connection = mysql.connector.connect(
        host='localhost',
        user='root',
        password='evan1234',
        database='airflow_db'
    )
    
    try:
        cursor = mysql_connection.cursor()
        cursor.execute("SELECT * FROM test_table")
        mysql_data = cursor.fetchall()
        print("MySQL Data:", mysql_data)
    finally:
        cursor.close()
        mysql_connection.close()
    
    # Get data from PostgreSQL
    postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
    postgres_data = postgres_hook.get_records("SELECT * FROM test_table")
    print("PostgreSQL Data:", postgres_data)

with DAG(
    'step10_mysql_postgres_example',
    default_args=default_args,
    description='A DAG demonstrating regular MySQL and PostgreSQL operators',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    # MySQL Tasks using PythonOperator
    create_mysql_table_task = PythonOperator(
        task_id='create_mysql_table',
        python_callable=create_mysql_table,
        provide_context=True
    )

    insert_mysql_data_task = PythonOperator(
        task_id='insert_mysql_data',
        python_callable=insert_mysql_data,
        provide_context=True
    )

    # PostgreSQL Tasks
    create_postgres_table = PostgresOperator(
        task_id='create_postgres_table',
        postgres_conn_id='postgres_default',
        sql="""
        CREATE TABLE IF NOT EXISTS test_table (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100),
            value INT
        );
        """
    )

    insert_postgres_data = PostgresOperator(
        task_id='insert_postgres_data',
        postgres_conn_id='postgres_default',
        sql="""
        INSERT INTO test_table (name, value) VALUES
        ('test1', 100),
        ('test2', 200),
        ('test3', 300);
        """
    )

    # Process data from both databases
    process_data_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data,
        provide_context=True
    )

    # Define task dependencies
    create_mysql_table_task >> insert_mysql_data_task
    create_postgres_table >> insert_postgres_data
    [insert_mysql_data_task, insert_postgres_data] >> process_data_task 

Airflow 실행

  • 두개의 터미널이 필요하며 각각 airflow 환경변수가 필요함
  • 먼저 하나의 터미널
  • 다음과 같이 순차적으로 필요함
$ export AIRFLOW_HOME=$(pwd)/airflow
$ airflow db reset -y
$ airflow db init
$ airflow users create \
    --username admin \
    --firstname admin \
    --lastname admin \
    --role Admin \
    --email admin@example.com \
    --password 1234
$ airflow webserver -p 8080

Airflow PostgreSQL 설정

  • AirFlow에서 제공하는 메서드 사용
  • Airflow 메뉴에서 [Admin] > [Connections] > [Conn ID] > Postgres 관련 영역에서 설정 추가

Screenshot 2025-05-03 at 1.57.11 PM.png

  • 다음과 같이 설정하는 영역에서 필요한 내용을 작성한다.
    • MySQL은 여기에서는 하지 않는다.

Screenshot 2025-05-03 at 1.58.03 PM.png

  • 다른 하나의 터미널
$ export AIRFLOW_HOME=$(pwd)/airflow
$ airflow scheduler

최종 확인

  • 모두 정상적으로 동작하는 것을 확인할 수 있다.
  • 또한 앞에서 봤듯이 MySQL에도 데이터가 정상적으로 들어가는 것을 확인할 수 있다.

Screenshot 2025-05-03 at 2.15.26 PM.png