PostgreSQL

Airflow 활용한 DB Insert 예제 (M1, MacOS)

개요

  • MySQL과 PostgreSQL에 각각 테이블 생성 후 데이터 넣기
  • 아래와 같이 병렬적으로 실행하는 예제

Screenshot 2025-05-03 at 12.40.31 PM.png

시나리오

  • 테이블과 데이터를 추가하되 두 DB에 대한 접근 방식이 다름
    • MySQL은 직접적으로 넣기
    • PostgreSQL은 Airflow를 통해서 데이터 넣기

환경설정

  • 사전에 MySQL과 PostgreSQL이 설치가 이미 되어 있음을 가정한다.

파이썬 설치

프로젝트 초기화

  • 프로젝트 디렉터리에서 다음과 순차적으로 실행
$ uv venv -p 3.11
$ source .venv/bin/activate

Airflow 설치

  • 먼저 환경변수를 설정한다.
$ export AIRFLOW_HOME=$(pwd)/airflow

셸 스크립트 작성 및 실행

  • 다음과 같이 셸 스크립트 작성
    • 파일명 : install_airflow.sh
AIRFLOW_VERSION=2.8.0

# Python 버전을 3.11로 고정 설정
PYTHON_VERSION="3.11"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example this would install 3.0.0 with python 3.11: https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.11.txt

uv pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
uv pip install -r requirements.txt
  • 라이브러리 목록
    • 파일명 : requirements.txt
pandas
numpy
seaborn
matplotlib
requests
psycopg2-binary
streamlit
plotly
pytz
mysql-connector-python
apache-airflow-providers-postgres==5.7.1
  • 셸 스크립트를 실행한다.
chmod +x install_airflow.sh
./install_airflow.sh

MySQL, PostgreSQL 설정확인

MySQL 설정

  • 설정
    • host : localhost
    • user : root
    • password : evan1234
    • port : 3306
    • Schema : airflow_db

PostgreSQL 설정

  • 설정
    • host : localhost
    • user : postgres
    • password : 1234
    • port : 5432
    • Database : python_dataengineering

최종 결과 예시

  • 정상적으로 입력이 되면 다음과 같이 출력이 될 것이다. (미리 확인용)
  • MySQL

Screenshot 2025-05-03 at 1.43.48 PM.png

PostgreSQL 설치 on Windows 11 (2025)

PostgreSQL 설치파일 다운로드

image.png

image.png

설치

  • 설치파일 실행 (관리자 권한)

image.png

image.png

image.png

image.png

  • password는 evan1234

image.png

  • 포트번호 확인

image.png

image.png

image.png

image.png

image.png

image.png

  • 프로그램 검색 창에서 pgAdmin 4 프로그램 열기

image.png

  • 실행되는지 확인

image.png

환경변수 설정

  • 경로 복사

image.png

  • 시스템 환경 변수 열기 후 복사하기
C:\Program Files\PostgreSQL\17\bin

image.png

  • CMD나 PowerShell에서 확인
C:\Users\campus3S043>psql --version
psql (PostgreSQL) 17.4

Heroku를 활용한 카카오챗봇 배포 - DB조회편

읽기 전 공지

  • 본 글은 2022년 11월 28일까지만 유효합니다. 무료 버전이 사라지기 때문에, 앞으로 어떻게 될지는 현재 글 쓰는 시점에서는 모릅니다. 이 부분에 주의해서 참고 하시기를 바랍니다.

강의 홍보

Heroku를 활용한 카카오챗봇 배포 - 응용편

읽기 전 공지

  • 본 글은 2022년 11월 28일까지만 유효합니다. 무료 버전이 사라지기 때문에, 앞으로 어떻게 될지는 현재 글 쓰는 시점에서는 모릅니다. 이 부분에 주의해서 참고 하시기를 바랍니다.

강의 홍보

Heroku를 활용한 카카오챗봇 배포 - 인사말편

읽기 전 공지

  • 본 글은 2022년 11월 28일까지만 유효합니다. 무료 버전이 사라지기 때문에, 앞으로 어떻게 될지는 현재 글 쓰는 시점에서는 모릅니다. 이 부분에 주의해서 참고 하시기를 바랍니다.

강의 홍보

Heroku를 활용한 배포 - DB 연결편

읽기 전 공지

  • 본 글은 2022년 11월 28일까지만 유효합니다. 무료 버전이 사라지기 때문에, 앞으로 어떻게 될지는 현재 글 쓰는 시점에서는 모릅니다. 이 부분에 주의해서 참고 하시기를 바랍니다.

강의 홍보

Flask Heroku Pandas Postgres 튜토리얼

강의 홍보

개요

  • Flask 기본적인 작동 원리를 배운다.
  • Postgres와 SQLAlchemy를 활용한다.
  • Heroku에 배포를 진행한다.

사전준비

  • Github에 각 개인에게 맞는 Github Repo를 생성한다.
    • 주의 : 반드시 Unique하게 작성해야 한다.
  • 가상환경 설정을 진행한다.
  • PostgreSQL DB 설정은 다음을 참조한다.
virtualenv venv
  • 주요 라이브러리를 설치한다.
pip install Flask psycopg2 SQLAlchemy pandas gunicorn

배포

  • 프로젝트 시작 전에는 항상 선 배포를 하고 시작한다.
  • app.py를 만들어본다.
# -*- coding: utf-8 -*-

from flask import Flask

app = Flask(__name__)

@app.route("/")
def index():
    return "Hello World!"
  • 3개의 파일이 필요하다.

PostgreSQL Installation on WSL2 and Windows

개요

  • WSL2에서 PostgreSQL을 설치한다.
  • pgAdmin은 Windows에 설치한다.

터미널 업그레이드

  • 먼저 WSL 터미널을 열고, Ubuntu 패키지를 모두 업데이트 및 업그레이드를 한다.
$ sudo apt update
[sudo] password for evan:
Hit:1 https://artifacts.elastic.co/packages/7.x/apt stable InRelease
Get:2 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:3 http://archive.ubuntu.com/ubuntu focal InRelease
Get:4 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Get:5 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Get:6 http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages [1712 kB]
Get:7 http://archive.ubuntu.com/ubuntu focal-updates/universe amd64 Packages [916 kB]
Fetched 2963 kB in 5s (600 kB/s)
Reading package lists... Done
Building dependency tree
Reading state information... Done
All packages are up to date.
  • 이번에는 업그레이드를 해본다.
$ sudo apt-get upgrade
Reading package lists... Done
Building dependency tree
Reading state information... Done
Calculating upgrade... Done
0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded.

PostgreSQL Installation in WSL2

  • 이번에는 WSL2에서 PostgreSQL을 설치한다. 설치가 종료되면, 반드시 버전을 확인한다.
$ sudo apt install postgresql postgresql-contrib
$ psql --version
psql (PostgreSQL) 12.9 (Ubuntu 12.9-0ubuntu0.20.04.1)
  • 설치 이후에는 Database를 접근가능하도록 활성화해야 한다.
    • 포트가 활성화 되어 있지 않다면 아래와 같은 메시지가 나타날 것이다.
$ sudo service postgresql status
12/main (port 5432): down
  • 이번에는 활성화를 해보도록 한다. 온라인이라는 메시지가 나타난다면 활성화가 되었다는 것을 의미한다.
$ sudo service postgresql start
 * Starting PostgreSQL 12 database server
$ sudo service postgresql status
12/main (port 5432): online
  • 이번에는 활성화된 데이터베이스를 종료시킨다.
$ sudo service postgresql stop
 * Stopping PostgreSQL 12 database server                                                                        [ OK ]
$ sudo service postgresql status
12/main (port 5432): down

사용자 계정 Password 설정

  • 기본적으로 admin 사용자로 등록이 되어 있다. 보통 DB 초기 세팅 시에는 패스워드를 입력받아야 한다. (예: evan)
$ sudo passwd postgres
New password:
Retype new password:
passwd: password updated successfully
  • 여기까지 했다면, WSL2에서 추가로 설정할 것은 더 없다.

pgAdmin Installation on Windows

Untitled

Python with PostgreSQL - Create Database

PostgreSQL 및 Python 연동 예제

라이브러리 설치

  • 우선 설치를 진행한다.
$ pip install psycopg2-binary
Downloading psycopg2_binary-2.9.2-cp310-cp310-win_amd64.whl (1.2 MB)
     |████████████████████████████████| 1.2 MB 6.4 MB/s
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.2

현재 Database 확인

  • cmd 파일 창을 열고, 현재 DB 리스트를 확인한다.
    • \list or l: 전체 databases 리스트를 조회한다.
C:\Users\user>psql --username=postgres
postgres 사용자의 암호:
psql (13.5)
도움말을 보려면 "help"를 입력하십시오.
postgres=# \l
데이터베이스 목록
   이름    |  소유주  | 인코딩 |     Collate      |      Ctype       |      액세스 권한
-----------+----------+--------+------------------+------------------+-----------------------
 postgres  | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 |
 template0 | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 | =c/postgres          +
           |          |        |                  |                  | postgres=CTc/postgres
 template1 | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 | =c/postgres          +
           |          |        |                  |                  | postgres=CTc/postgres
(3개 행)

Database 생성

# import the psycopg2 database adapter for PostgreSQL
from psycopg2 import connect, extensions

# connect
def createDB():
    conn = connect(
        database="postgres", user='postgres', password='your_password', host='127.0.0.1', port='5432'
    )

    # object type: psycopg2.extensions.connection
		# object type: conn 객체 유형을 확인한다. 
    print("\ntype(conn):", type(conn))

    # 명령 처리 함수 구현
    cursor = conn.cursor()

    # Create Database Creation
		# 먼저 DB_NAME을 생성한다. 
		
    DB_NAME = "testDB"
	
    # get the isolation leve for autocommit
		# autocommit을 설정한다. 
    autocommit = extensions.ISOLATION_LEVEL_AUTOCOMMIT
    print("ISOLATION_LEVEL_AUTOCOMMIT:", extensions.ISOLATION_LEVEL_AUTOCOMMIT)

    # set the isolation level for the connection's cursors
    # will raise ActiveSqlTransaction exception otherwise
    conn.set_isolation_level(autocommit)

    # Create Database
    # instantiate a cursor object from the connection
    cursor = conn.cursor()

    # use the execute METHOD to make a SQL Request
    cursor.execute("CREATE DATABASE " + str(DB_NAME))
    print("Database created successfully...!")

    # close the cursor to avoid memory leaks
    cursor.close

    # Connection Closed to avoid memory leaks
    conn.close()

if __name__ == "__main__":
    createDB()
  • DB 생성시 중요한 건, autocommit을 설정해줘야 한다는 것이다. 만약 해당 설정을 삭제하고 재 실행하면, psycopg2.errors.ActiveSqlTransaction: CREATE DATABASE cannot run inside a transaction block 과 같은 에러 메시지가 나타나게 될 것이다.

현재 Database 확인

  • cmd 파일 창을 열고, 현재 DB 리스트를 확인한다.
    • \list or l: 전체 databases 리스트를 조회한다.
    • testdb가 생성된 것을 확인할 수 있다.
postgres=# \l
                                      데이터베이스 목록
   이름    |  소유주  | 인코딩 |     Collate      |      Ctype       |      액세스 권한
-----------+----------+--------+------------------+------------------+-----------------------
 postgres  | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 |
 template0 | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 | =c/postgres          +
           |          |        |                  |                  | postgres=CTc/postgres
 template1 | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 | =c/postgres          +
           |          |        |                  |                  | postgres=CTc/postgres
 testdb    | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 |
(4개 행)

Database 삭제

  • 이번에는 Database를 삭제하는 코드를 작성하고, 실행하여 testdb를 삭제하도록 한다.
# import the psycopg2 database adapter for PostgreSQL
from psycopg2 import connect, extensions

# delete
def deleteDB():
    conn = connect(
        database="postgres", user='postgres', password='your_password', host='127.0.0.1', port='5432'
    )

    # object type: psycopg2.extensions.connection
    print("\ntype(conn):", type(conn))

    # SQL Query
    DB_NAME = "testDB"
    # get the isolation leve for autocommit
    autocommit = extensions.ISOLATION_LEVEL_AUTOCOMMIT
    print("ISOLATION_LEVEL_AUTOCOMMIT:", extensions.ISOLATION_LEVEL_AUTOCOMMIT)

    # set the isolation level for the connection's cursors
    # will raise ActiveSqlTransaction exception otherwise
    conn.set_isolation_level(autocommit)

    # Create Database
    # instantiate a cursor object from the connection
    # 명령 처리 함수 구현
    cursor = conn.cursor()

    # use the execute METHOD to make a SQL Request
    cursor.execute("DROP DATABASE " + str(DB_NAME))
    print("Database Drop successfully...!")

    # close the cursor to avoid memory leaks
    cursor.close()

    # Connection Closed to avoid memory leaks
    conn.close()

if __name__ == "__main__":
    # createDB()
    deleteDB()

현재 Database 확인

  • cmd 파일 창을 열고, 현재 DB 리스트를 확인한다.
    • \list or l: 전체 databases 리스트를 조회한다.
    • testdb 가 삭제된 것을 확인할 수 있다.
C:\Users\user>psql --username=postgres
postgres 사용자의 암호:
psql (13.5)
도움말을 보려면 "help"를 입력하십시오.
postgres=# \l
데이터베이스 목록
   이름    |  소유주  | 인코딩 |     Collate      |      Ctype       |      액세스 권한
-----------+----------+--------+------------------+------------------+-----------------------
 postgres  | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 |
 template0 | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 | =c/postgres          +
           |          |        |                  |                  | postgres=CTc/postgres
 template1 | postgres | UTF8   | Korean_Korea.949 | Korean_Korea.949 | =c/postgres          +
           |          |        |                  |                  | postgres=CTc/postgres
(3개 행)

소결

  • 해당 함수에서 불필요하게 재반복해서 사용하는 코드들이 있다.
  • 이러한 재반복 코드는 Class로 정의해서 사용하면 훨씬 더 간결하게 작성할 수 있다.
  • 다음번에는 Class로 정의해서 코드를 작성하도록 한다.

Airflow를 활용한 PostgreSQL에서 Elasticsearch로 데이터 마이그레이션 예제

강의 홍보

개요

  • Airflow를 활용하여 PostgreSQL에 저장된 데이터를 디스크로 다운로드 받고, 그리고 그 파일을 다시 읽어서 Elasticsearch에 저장하도록 한다.
  • 전체적인 흐름은 getData from PostgreSQL >> insertData to Elasticsearch 로 저장할 수 있다.

전체 코드 실행

  • 우선 전체 코드를 실행하도록 한다.
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch

def queryPostgresql():
    conn_string="dbname='python_dataengineering' host='localhost' user='postgres' password='evan'"
    conn = db.connect(conn_string)
    df = pd.read_sql("select name, city from users", conn)
    df.to_csv("dags/postgresqldata.csv")
    print("----------Data Saved------------")

def insertElasticsearch():
    es=Elasticsearch()
    df = pd.read_csv("dags/postgresqldata.csv")
    print(df.head())
    for i, r in df.iterrows():
        doc = r.to_json()
        res = es.index(index="frompostgresql", doc_type="doc", body=doc)
        print(res)

default_args = {
    'owner': 'evan',
    'start_date': dt.datetime(2021, 9, 15),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('MyDBdag',
         default_args = default_args,
         schedule_interval=timedelta(minutes=5),
         ) as dag:

    print_starting = BashOperator(task_id='starting',
                                  bash_command='echo "I am reading the PostgreSQL now....."')

    getData = PythonOperator(task_id='QueryPostgreSQL',
                             python_callable=queryPostgresql)

    insertData = PythonOperator(task_id="InsertDataElasticsearch",
                                python_callable=insertElasticsearch)

print_starting >> getData >> insertData
(venv) $ airflow webserver
(venv) $ airflow scheduler

결과 확인

  • 브라우저로 [http://localhost:8080](http://localhost:8080) 에 접속한다.
  • MyDBdag 를 실행 후, 클릭한다.

airflow_01.png