MySQL

Airflow 활용한 DB Insert 예제 (M1, MacOS)

개요

  • MySQL과 PostgreSQL에 각각 테이블 생성 후 데이터 넣기
  • 아래와 같이 병렬적으로 실행하는 예제

Screenshot 2025-05-03 at 12.40.31 PM.png

시나리오

  • 테이블과 데이터를 추가하되 두 DB에 대한 접근 방식이 다름
    • MySQL은 직접적으로 넣기
    • PostgreSQL은 Airflow를 통해서 데이터 넣기

환경설정

  • 사전에 MySQL과 PostgreSQL이 설치가 이미 되어 있음을 가정한다.

파이썬 설치

프로젝트 초기화

  • 프로젝트 디렉터리에서 다음과 순차적으로 실행
$ uv venv -p 3.11
$ source .venv/bin/activate

Airflow 설치

  • 먼저 환경변수를 설정한다.
$ export AIRFLOW_HOME=$(pwd)/airflow

셸 스크립트 작성 및 실행

  • 다음과 같이 셸 스크립트 작성
    • 파일명 : install_airflow.sh
AIRFLOW_VERSION=2.8.0

# Python 버전을 3.11로 고정 설정
PYTHON_VERSION="3.11"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example this would install 3.0.0 with python 3.11: https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.11.txt

uv pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
uv pip install -r requirements.txt
  • 라이브러리 목록
    • 파일명 : requirements.txt
pandas
numpy
seaborn
matplotlib
requests
psycopg2-binary
streamlit
plotly
pytz
mysql-connector-python
apache-airflow-providers-postgres==5.7.1
  • 셸 스크립트를 실행한다.
chmod +x install_airflow.sh
./install_airflow.sh

MySQL, PostgreSQL 설정확인

MySQL 설정

  • 설정
    • host : localhost
    • user : root
    • password : evan1234
    • port : 3306
    • Schema : airflow_db

PostgreSQL 설정

  • 설정
    • host : localhost
    • user : postgres
    • password : 1234
    • port : 5432
    • Database : python_dataengineering

최종 결과 예시

  • 정상적으로 입력이 되면 다음과 같이 출력이 될 것이다. (미리 확인용)
  • MySQL

Screenshot 2025-05-03 at 1.43.48 PM.png

Docker-Compose와 Dockerfile을 활용한 Flask-MySQL 연동 예제

개요

  • Docker-Compose와 Dockerfile의 주요 기능을 이해한다.
  • 각 파일의 위치와 주요 기능을 이해한다.

전체 프로젝트 파일 디렉터리

  • 본 프로젝트의 전체 코드는 다음과 같다.
  • 실제 코드 작성을 해야하는 곳은 다음과 같다.
    • app.py
    • requirements.txt
    • init.sql
    • docker-compose.yml
    • Dockerfile
docker_kubernetes_flask/
├── app/
│   ├── __init__.py
│   ├── app.py
│   └── requirements.txt
├── db/
│   ├── init.sql
│   └── data/ (This will be created by Docker)
├── docker-compose.yml
└── Dockerfile

사전준비

  • 사전에 Docker는 Desktop 설치가 되어 있다고 가정한다.
  • 코드 편집을 위해서는 Visual Studio Code를 활용한다.

Docker가 익숙하지 않은 사람들을 위한 1줄 요약

  • MySQL 설치하고, Python 설치하고, 두개 또 연동해야 하고, CLI 명령어 또 각각 입력하는거 다 자동화 해줄게요!!
  • 즉, 자동화에 익숙해지자!

docker-compose와 Dockerfile 간단 비교

  • docker-compose.yml : python 컨테이너와 mysql 컨테이너를 각각 한꺼번에 구성하도록 스크립트를 작성함
  • Dockerfile : 여기에서는 python 개발환경을 구성함
  • docker-compose.yml에서 Dockerfile을 호출하여 개발환경을 만들도록 지시할 수 있음

전체 코드 흐름 1줄 요약

  • From MySQL to Python Flask

init.sql과 app.py 간단 설명

  • 각 두개의 파일은 사전에 미리 작성을 해둔다.

MySQL : init.sql

  • SQL 코드는 데이터베이스와 테이블을 생성하고, 테이블에 데이터를 삽입하는 작업을 수행.
    • test_db 데이터베이스 생성
    • users 테이블 생성
    • 간단하게 이름 생성
CREATE DATABASE IF NOT EXISTS test_db;

USE test_db;

CREATE TABLE IF NOT EXISTS users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL
);

INSERT INTO users (name) VALUES ('Evan');
INSERT INTO users (name) VALUES ('Sara');
INSERT INTO users (name) VALUES ('Lotto');

Python : app.py

  • Python 코드는 Flask 웹 애플리케이션을 설정하여 MySQL 데이터베이스에 연결하고, 사용자 데이터를 JSON 형식으로 반환하는 작업을 수행.
from flask import Flask, jsonify
import mysql.connector
import os

app = Flask(__name__)

def get_db_connection():
    connection = mysql.connector.connect(
        host='mysql',
        user='root',
        password='example',
        database='test_db'
    )
    return connection

@app.route('/')
def index():
    connection = get_db_connection()
    cursor = connection.cursor()
    cursor.execute('SELECT * FROM users')
    users = cursor.fetchall()
    cursor.close()
    connection.close()
    users_list = [{"id": user[0], "name": user[1]} for user in users]
    return jsonify(users_list)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
  • 좀더 구체적으로 살펴본다.

step 01 - 라이브러리 불러오기

  • Flask: Flask 웹 애플리케이션 프레임워크를 불러오기
  • jsonify: 데이터를 JSON 형식으로 변환하여 HTTP 응답으로 반환하는 데 사용
  • mysql.connector: MySQL 데이터베이스에 연결하기 위해 사용
from flask import Flask, jsonify
import mysql.connector
import os

step 02 - Flask Application Setup

  • Flask 애플리케이션 인스턴스를 생성. __name__은 현재 모듈의 이름을 전달하여 Flask 애플리케이션을 생성하는 데 사용.
app = Flask(__name__)

step 03 - Database Connection Function

  • get_db_connection 함수는 MySQL 데이터베이스에 연결하고, 연결 객체를 반환.
  • host, user, password, database 매개변수는 데이터베이스에 연결하기 위한 정보
def get_db_connection():
    connection = mysql.connector.connect(
        host='mysql',
        user='root',
        password='example',
        database='test_db'
    )
    return connection

step 04 - Index Route

  • @app.route('/'): 해당 Decorator는 URL (’/’)에 대한 요청을 처리하는 index 함수를 정의.
  • index 함수의 내용은 다음과 같이 구성됨
    • get_db_connection을 호출하여 데이터베이스에 연결.
    • 연결 객체에서 커서를 생성하고, SELECT * FROM users 쿼리를 실행하여 users 테이블의 모든 데이터를 가져오기
    • 데이터를 가져온 후 커서와 연결을 닫기
    • users 데이터를 List Comprehension을 사용하여 딕셔너리 형태로 변환합니다. 각 사용자에 대해 idname 키를 가지는 딕셔너리를 생성.
      • 이 부분은 별도의 HTML 소스코드를 넣지 않기 위해서 진행한 것이니, 해당 자세한 내용을 보기를 원한다면 Flask 웹개발로 더 공부할 것 권장
    • 변환된 리스트를 jsonify를 사용하여 JSON 형식으로 반환.
@app.route('/')
def index():
    connection = get_db_connection()
    cursor = connection.cursor()
    cursor.execute('SELECT * FROM users')
    users = cursor.fetchall()
    cursor.close()
    connection.close()
    users_list = [{"id": user[0], "name": user[1]} for user in users]
    return jsonify(users_list)

step 05 - Running the Application

  • 모듈이 직접 실행될 때만 Flask 애플리케이션을 실행
    • app.run 메소드를 호출하여 애플리케이션을 시작.
    • host='0.0.0.0': 애플리케이션이 모든 네트워크 인터페이스에서 접근 가능하도록 설정.
    • port=5000: 애플리케이션이 5000번 포트에서 실행되도록 설정.
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Python : requirements.txt

  • 주요 라이브러리 설치 위해 필요한 라이브러리 2개만 설치
  • 추후에 독자가 라이브러리 추가 가능
Flask
mysql-connector-python

Docker: Dockerfile

  • Dockerfile은 Python 애플리케이션을 컨테이너화하기 위한 스크립트
  • 다른 파일과 달리 확장자명이 없다는 것에 주의
  • 다양한 옵션에 대해 설명하도록 한다.
FROM python:3.10-slim

WORKDIR /app

COPY app/requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY app/ .

EXPOSE 5000

CMD ["python", "app.py"]

step 01 - FROM

FROM python:3.10-slim
  • Base Image를 선택한다. 해당 이미지를 선택하려면 Docker Hub 검색창에서 확인 (Login 필수)
  • 각 Base Image를 선택하면 관련 Tag가 존재하며 여기에서 Tags 확인해서 입력하도록 한다.
  • Docker Official Image 를 선택한다.

Screenshot 2024-07-01 at 8.16.09 AM.png

Pandas DataFrame to MySQL Database using iris Data

개요

  • 이전 강의에 이어서 진행한다. (MySQL Select Clause via Python)
  • 임의의 Pandas 데이터 프레임에서 MySQL DB로 추가하는 코드를 작성한다.

주요 라이브러리 설치

  • 아래와 같이 주요 라이브러리를 설치한다.
    • MySQL과 관련된 주요 Python 라이브러리를 설치한다.
pip install mysql-connector mysql-connector-python pymysql SQLAlchemy seaborn pandas

코드 작성(mysql-connector)

  • 아래와 같이 코드를 작성한다.
# 파일명 : db.py
import mysql.connector
import pandas as pd
import seaborn as sns

mydb = mysql.connector.connect(
    host = "localhost", 
    user = "root", 
    passwd = "evan",
    database = "muldb"
)

print(mydb)

iris_df = sns.load_dataset('iris')

my_cursor = mydb.cursor()

create_table_query  = """
   CREATE TABLE IF NOT EXISTS iris (
    id INT AUTO_INCREMENT PRIMARY KEY,
    sepal_length FLOAT,
    sepal_width FLOAT,
    petal_length FLOAT,
    petal_width FLOAT,
    species VARCHAR(255)
    ); 
"""

my_cursor.execute(create_table_query)

insert_query = """
    INSERT INTO iris (sepal_length, sepal_width, petal_length, petal_width, species) 
    VALUES (%s, %s, %s, %s, %s)
"""

data = [tuple(x) for x in iris_df.to_numpy()]

for row in data:
    my_cursor.execute(insert_query, row)

# DB Commit
mydb.commit()

# 종료
my_cursor.close()
mydb.close()
  • 이번에는 Python 파일을 실행한다.
$ python db.py 
<mysql.connector.connection_cext.CMySQLConnection object at 0x0000025FFC155BD0>
  • 이번에는 MySQL Workbench에서 확인한다.

Untitled

MySQL Select Clause via Python

개요

데이터 조회

  • 다음 코드를 작성한다.
import mysql.connector

mydb = mysql.connector.connect(
    host = "localhost", 
    user = "root", 
    passwd = "evan", 
    database = "mulcampdb"
)

print(mydb)

my_cursor = mydb.cursor()

query = """
   SELECT * FROM users;
"""

my_cursor.execute(query)

result = my_cursor.fetchall()
for row in result:
    print(row)
    
print("완료")
  • 파일을 실행한다.
$ python database.py 
<mysql.connector.connection_cext.CMySQLConnection object at 0x000001FE5A985F10>
('Evan', 'Evan@gmail.com', 30, 1)
('Evan', 'Evan@gmail.com', 30, 2)
('Mary', 'Mary@gmail.com', 20, 3)
('Sara', 'Sara@gmail.com', 25, 4)
완료

pandas 데이터프레임으로 변환

  • 주어진 결괏값으로 pandas 데이터프레임으로 변환한다.
  • 먼저 라이브러리를 설치한다.
$ pip install pandas
  • 데이터 같이 코드를 작성한다.
import mysql.connector
import pandas as pd 

mydb = mysql.connector.connect(
    host = "localhost", 
    user = "root", 
    passwd = "evan", 
    database = "mulcampdb"
)

print(mydb)

my_cursor = mydb.cursor(dictionary=True)

query = """
   SELECT * FROM users;
"""

my_cursor.execute(query)

result = my_cursor.fetchall()
for row in result:
    print(row)

df = pd.DataFrame(result)
print(df)
print("완료")
  • 파일을 실행한다.
$ python database.py 
<mysql.connector.connection_cext.CMySQLConnection object at 0x000002C4C47D5490>
{'name': 'Evan', 'email': 'Evan@gmail.com', 'age': 30, 'user_id': 1}
{'name': 'Evan', 'email': 'Evan@gmail.com', 'age': 30, 'user_id': 2}
{'name': 'Mary', 'email': 'Mary@gmail.com', 'age': 20, 'user_id': 3}
{'name': 'Sara', 'email': 'Sara@gmail.com', 'age': 25, 'user_id': 4}
   name           email  age  user_id
0  Evan  Evan@gmail.com   30        1
1  Evan  Evan@gmail.com   30        2
2  Mary  Mary@gmail.com   20        3
3  Sara  Sara@gmail.com   25        4
완료

코드 복기

  • 커서를 생성하는 코드를 다시 확인해본다.
    • 이 코드는 dictionary=True는 조회 결과를 Dictionary 형태로 받기 위한 설정이다.
my_cursor = mydb.cursor(dictionary=True)
  • 이 코드로 인해, pandas DataFrame으로 변환하는 것이 매우 쉬워진다.
result = my_cursor.fetchall()
df = pd.DataFrame(result)
print(df)

MySQL Table Creation and Insert Data via Python

개요

테이블 생성

  • 아래 코드를 작성하면 테이블이 생성된다.
import mysql.connector

mydb = mysql.connector.connect(
    host = "localhost", 
    user = "root", 
    passwd = "evan", 
    database = "mulcampdb"
)

print(mydb)

my_cursor = mydb.cursor()
query = """
    CREATE TABLE users (
        name VARCHAR(255)
        , email VARCHAR(255)
        , age INTEGER(10)
        , user_id INTEGER AUTO_INCREMENT PRIMARY KEY
    );
"""
my_cursor.execute(query)
my_cursor.execute("SHOW TABLES;")
for table in my_cursor:
    print(table[0])
  • 파일을 실행한다.
$ python database.py 
<mysql.connector.connection_cext.CMySQLConnection object at 0x0000028942314F90>
users
  • MySQL Workbench에 생성된 테이블이 만들어지는지 확인한다.

Untitled

Connect To Database in Python

개요

  • Python과 MySQL을 연동하도록 한다.
  • 프로젝트 폴더에 가상환경이 설치가 되어 있는 것으로 가정한다.
  • MySQL은 기 설치가 되어 있는 것으로 가정한다.

라이브러리 설치

  • Python과 MySQL을 연동해주는 라이브러리 종류는 다양하게 있다.
$ pip install mysql-connector mysql-connector-python

파일 작성

  • 간단하게 파일을 작성한다.
import mysql.connector

mydb = mysql.connector.connect(
    host = "localhost", 
    user = "root", 
    passwd = "evan"
)

print(mydb)
  • 파일을 실행한다.
$ python database.py 
<mysql.connector.connection_cext.CMySQLConnection object at 0x000002BF4E606090>
(venv) 

Python 코드 활용하여 DB 생성

  • 이번에는 코드를 활용하여 Schema를 생성한다.
import mysql.connector

mydb = mysql.connector.connect(
    host = "localhost", 
    user = "root", 
    passwd = "evan"
)

print(mydb)

my_cursor = mydb.cursor()
query = """
    CREATE DATABASE mulcampdb
"""

my_cursor.execute(query)

print("완료")
  • 파일을 실행한다.
$ python database.py 
<mysql.connector.connection_cext.CMySQLConnection object at 0x0000020BB6504F50>
완료
  • MySQL Workbench에서 DB가 생성되었는지 확인한다.

Untitled

MySQL 삭제, 재설치 가이드 on M1

개요

  • M1에서 MySQL을 설치 하고 Workbench에 접속하는 과정을 설명한다.
  • 데이터 로드 시, ASCII 에러 과정 해결하는 방법도 살펴본다. (임시방편)

사전학습

  • brew 명령어를 알고 있는 분에 한해 작성을 하였다.

주의

  • 아래 코드 복사할 시, $ 는 제외 후 복사한다.

MySQL 실행 확인 후 프로세스 Kill

  • 먼저 MySQL이 실행중인지를 확인한다.
$ brew services list
Name  Status  User File
mysql started evan ~/Library/LaunchAgents/homebrew.mxcl.mysql.plist
  • 서비스를 강제 종료한다.
$ brew services stop mysql
Stopping `mysql`... (might take a while)
==> Successfully stopped `mysql` (label: homebrew.mxcl.mysql)

관련 파일 삭제

  • 재 설치를 위해서는 기존에 설치된 파일 목록 등을 모두 제거한다.
$ which mysql
/opt/homebrew/bin/mysql
  • Homebrew로 기존에 설치했다면 아래 명령어를 실행한다.
$ brew uninstall --force mysql
Uninstalling mysql... (323 files, 312.8MB)
  • 다음 라인을 한줄씩 실행한다.
sudo rm -rf /usr/local/mysql
sudo rm -rf /usr/local/bin/mysql
sudo rm -rf /usr/local/var/mysql
sudo rm -rf /usr/local/Cellar/mysql
sudo rm -rf /usr/local/mysql*
sudo rm -rf /tmp/mysql.sock.lock
sudo rm -rf /tmp/mysqlx.sock.lock
sudo rm -rf /tmp/mysql.sock
sudo rm -rf /tmp/mysqlx.sock
sudo rm ~/Library/LaunchAgents/homebrew.mxcl.mysql.plist
sudo rm -rf /Library/StartupItems/MySQLCOM
sudo rm -rf /Library/PreferencePanes/My*

MySQLWorkbench 삭제

  • Applications 폴더에서 해당 MySQLWorkbench 파일을 찾아 삭제한다.

Screenshot 2024-04-02 at 1.20.31 PM.png

MySQL Workbench File Import Error in Mac

개요

  • MySQL Workbench에서 File을 불러올 때 에러가 발생했을 때 대처 요령을 소개한다.
  • Workbench에서 File을 불러올 때 가끔 아래와 같은 아래가 발생하곤 한다.

Screenshot 2023-10-04 at 11.33.28 AM.png

  • 해결방법은 MySQL Shell Script에서 직접 파일을 불러오는 방식이다.

MySQL 재접속

  • 기존에 Workbench에 접속해 있었다면 우선 종료를 한다.

Screenshot 2023-10-04 at 11.35.16 AM.png

  • 재접속 전 Edit Connection 버튼을 클릭한다.

    Screenshot 2023-10-04 at 11.36.03 AM.png

  • Advanced Tab을 클릭한다.

    • Others 메뉴에서 OPT_LOCAL_INFILE=1 을 입력한다.
    • Test Connection 버튼을 클릭하여 정상적으로 접속이 되는지 재 확인한다.

    Screenshot 2023-10-04 at 11.36.43 AM.png

MySQL Workbench ERD - 1

개요

  • MySQL Workbench를 통해 ERD 작업을 수행하도록 한다.

Step 1 - 메뉴 선택

  • 상단 메뉴에서 Database > Reverse Engineer 를 선택한다.

Untitled

Step 2 - Reverse Engineer Database

  • Next 버튼을 클릭한다.

Untitled

Step 3 - Connect to MySQL Server

  • 팝업창에서 root 또는 사용자 비밀번호 입력 후, 확인 버튼을 누르면 아래와 같이 연결이 될 것이다.

Untitled

Step 4 - Schema 선택

  • classicmodels 스키마를 선택한다.

Untitled

Step 4 - Connect to MySQL Server

  • 비밀번호를 입력한다.

Untitled

MySQL Error Code 1175 해결

개요

  • Delete 문법을 진행하는데, 아래와 같은 에러가 발생하였다.

Untitled

  • 옵션 설정을 변경하여 코드를 재실행한다.

Safe Mode 끄기

  • 아래 코드를 실행한다.
SET SQL_SAFE_UPDATES = 0;
  • Edit > Preferences > SQL Editor > Other 에서 Safe Updates 체크되어 있는 것을 삭제한다.

Untitled

코드 재 실행

  • Delete 코드를 재 실행한다.
DELETE FROM tasks WHERE start_date = DATE('2023-09-14');

Untitled

  • 정상적으로 Delete 코드가 실행된 것을 확인할 수 있다.