Programming

파이썬을 활용한 엘라스틱서치에서 데이터 추출

강의 홍보

개요

  • 데이터를 질의하는 방법과 데이터를 삽입하는 방법은 동일하다.
  • 다만, 이 때에는 search 메서드를 사용하다.
  • 또한, doc 문서도 조금 다르다.
  • 기본적으로 SQL 과 문법이 다르기 때문에 공식문서를 확인한다.

실행

실습 1 - 전체 데이터 확인

  • 먼저 아래코드를 복사한 뒤 붙여넣기 하여 실행시켜보도록 한다.
    • 결괏값까지 직접 확인하도록 한다.
from elasticsearch import Elasticsearch
from pandas.io.json import json_normalize

es = Elasticsearch()

doc={"query":{"match_all":{}}}
res=es.search(index="users", body=doc, size=10)
print(res['hits']['hits'])

# 각 문서의 `_source`필드 출력만 가능
# for doc in res['hits']['hits']:
#   print(doc['_source'])

# pandas DataFrame
print("----------")
df = json_normalize(res['hits']['hits'])
print(df)
(venv) $ python3 step04_3_elasticsearch_query.py 
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.14/security-minimal-setup.html to enable security.
  warnings.warn(message, category=ElasticsearchWarning)
[{'_index': 'users', '_type': 'doc', '_id': '4Vam5HsBtoEC6_MiP2z3', '_score': 1.0, '_source': {'name': 'Toni Nguyen', 'street': '6060 Justin Freeway Suite 557', 'city': 'East Aaronstad', 'zip': '43029'}}, {'_index': 'users', '_type': 'doc', '_id': '41bN5HsBtoEC6_Mig2w3', '_score': 1.0, '_source': {'name': 'Jennifer Barr', 'street': '02693 Laura Views Apt. 582', 'city': 'Ericside', 'zip': '50021'}}, {'_index': 'users', '_type': 'doc', '_id': '5FbN5HsBtoEC6_Mig2w4', '_score': 1.0, '_source': {'name': 'Alex White', 'street': '674 Mcclure Islands', 'city': 'Richardfort', 'zip': '91321'}}, {'_index': 'users', '_type': 'doc', '_id': '5VbN5HsBtoEC6_Mig2w4', '_score': 1.0, '_source': {'name': 'Michael Melendez', 'street': '1732 Andrews Lakes Suite 023', 'city': 'West Rachaelhaven', 'zip': '96479'}}, {'_index': 'users', '_type': 'doc', '_id': '5lbN5HsBtoEC6_Mig2w4', '_score': 1.0, '_source': {'name': 'Dennis Moore', 'street': '4901 Barber Turnpike Suite 009', 'city': 'Lake Abigail', 'zip': '91137'}}, {'_index': 'users', '_type': 'doc', '_id': '51bN5HsBtoEC6_Mig2w4', '_score': 1.0, '_source': {'name': 'Angela Santiago', 'street': '2635 Duncan Isle Apt. 292', 'city': 'Garyton', 'zip': '74345'}}, {'_index': 'users', '_type': 'doc', '_id': '6FbN5HsBtoEC6_Mig2w4', '_score': 1.0, '_source': {'name': 'Cynthia Randall', 'street': '8738 Johnson Underpass', 'city': 'Brianshire', 'zip': '02178'}}, {'_index': 'users', '_type': 'doc', '_id': '6VbN5HsBtoEC6_Mig2w4', '_score': 1.0, '_source': {'name': 'James Schneider', 'street': '14248 Gregory Loaf', 'city': 'West Lindamouth', 'zip': '47982'}}, {'_index': 'users', '_type': 'doc', '_id': '6lbN5HsBtoEC6_Mig2w4', '_score': 1.0, '_source': {'name': 'Jonathan George', 'street': '8183 Flores Springs Apt. 129', 'city': 'South Jeffreyland', 'zip': '61791'}}, {'_index': 'users', '_type': 'doc', '_id': '61bN5HsBtoEC6_Mig2w4', '_score': 1.0, '_source': {'name': 'William Cooper', 'street': '338 Jeffrey Canyon Apt. 624', 'city': 'Wilkersonside', 'zip': '01065'}}]
----------
step04_3_elasticsearch_query.py:16: FutureWarning: pandas.io.json.json_normalize is deprecated, use pandas.json_normalize instead
  df = json_normalize(res['hits']['hits'])
  _index _type                   _id  _score      _source.name                  _source.street       _source.city _source.zip
0  users   doc  4Vam5HsBtoEC6_MiP2z3     1.0       Toni Nguyen   6060 Justin Freeway Suite 557     East Aaronstad       43029
1  users   doc  41bN5HsBtoEC6_Mig2w3     1.0     Jennifer Barr      02693 Laura Views Apt. 582           Ericside       50021
2  users   doc  5FbN5HsBtoEC6_Mig2w4     1.0        Alex White             674 Mcclure Islands        Richardfort       91321
3  users   doc  5VbN5HsBtoEC6_Mig2w4     1.0  Michael Melendez    1732 Andrews Lakes Suite 023  West Rachaelhaven       96479
4  users   doc  5lbN5HsBtoEC6_Mig2w4     1.0      Dennis Moore  4901 Barber Turnpike Suite 009       Lake Abigail       91137
5  users   doc  51bN5HsBtoEC6_Mig2w4     1.0   Angela Santiago       2635 Duncan Isle Apt. 292            Garyton       74345
6  users   doc  6FbN5HsBtoEC6_Mig2w4     1.0   Cynthia Randall          8738 Johnson Underpass         Brianshire       02178
7  users   doc  6VbN5HsBtoEC6_Mig2w4     1.0   James Schneider              14248 Gregory Loaf    West Lindamouth       47982
8  users   doc  6lbN5HsBtoEC6_Mig2w4     1.0   Jonathan George    8183 Flores Springs Apt. 129  South Jeffreyland       61791
9  users   doc  61bN5HsBtoEC6_Mig2w4     1.0    William Cooper     338 Jeffrey Canyon Apt. 624      Wilkersonside       01065
  • 이제 코드를 살펴본다.
    • 우선 doc={"query":{"match_all":{}}} 에서 match_all 은 주어진 문서의 모든 문서를 질의하는 요청에 해당함.
    • search 는 크게 index 명은 문서 객체를 의미하며 body 쿼리 내용을 저장하는 곳이다. 그리고, 마지막으로 결과집합의 크기도 지정한다.
    • 검색된 문서들을 출력한다.
      • 출력시 해당 쿼리는 JSON 객체로 반환됨을 유의한다.
    • json_normalize 함수는 JSON 객체를 데이터프레임으로 변환할 때 주로 사용한다.

실습 2 - 질의 응용 (1)

  • 이번에는 match_all 대신에 특정 이름만 출력하도록 한다.
  • 먼저 파이썬 코드를 작성해본다.
    • 여기서 봐야 하는 코드는 doc 이하 구문이다.
from elasticsearch import Elasticsearch

es = Elasticsearch()
doc = {"query": {"match":{"name":"Toni Nguyen"}}}
res = es.search(index="users", body=doc, size=10)
print(res['hits']['hits'][0]['_source'])
  • 이제 해당 코드를 실행하면 특정 이름과 연관된 결괏값만 확인할 수 있을 것이다.
python3 step04_4_elasticsearch_query_match.py 
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.14/security-minimal-setup.html to enable security.
  warnings.warn(message, category=ElasticsearchWarning)
{'name': 'Toni Nguyen', 'street': '6060 Justin Freeway Suite 557', 'city': 'East Aaronstad', 'zip': '43029'}

실습 3 - 질의 응용 (2)

  • 이번에는 Boolean 연산자 및 필터를 이용해서 좀 더 정밀하게 검색을 하도록 한다.
  • 이 때에는 bool 필드에 must 나, must not, should 를 지정하고 filter 필드에 조건을 지정하는 식이면 좋다.
  • 먼저 파이썬 코드를 확인해본다.
from elasticsearch import Elasticsearch

es = Elasticsearch()
doc = {
    "query": {
        "bool": {
            "must": {
                "match": {"city": "East Aaronstad"}
            },
            "filter": {
                "term":{"zip":"43029"}
            }
        }
    }
}

res = es.search(index="users", body=doc, size=10)
print(res['hits']['hits'][0]['_source'])
  • 먼저, city: East Aaronstad 와 매칭데는 모든 데이터가 조회된다.
  • 그 후에, filter 조건문에서 zip 코드가 43029 번호만 남게 하면 된다.
  • 이제 파일을 실행하여 결괏값을 확인한다.
    • 기존에 이름을 통해 검색한 결과와 동일하게 나타나는 것을 확인할 수 있다.
(venv) $ python3 step04_5_elasticsearch_query_bool.py 
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.14/security-minimal-setup.html to enable security.
  warnings.warn(message, category=ElasticsearchWarning)
{'name': 'Toni Nguyen', 'street': '6060 Justin Freeway Suite 557', 'city': 'East Aaronstad', 'zip': '43029'}

실습 4 - scroll 메서드 활용한 대형 결과 집합 처리

  • 검색 결과 집합의 크기를 10으로 설정했다. 그러나, 만약 10,000개 이상일 경우에는 부하가 걸리는 등 약간의 어려움이 노출이 된다.
    • 보통 이런 경우, 데이터를 여러번 나누어서 가져와야 하는데, 스크롤 함수를 통해서 해결이 가능하다.
  • 먼저 코드를 통해서 다수의 데이터를 가져오도록 한다.
from elasticsearch import Elasticsearch

es = Elasticsearch()
res = es.search(
    index='users',
    doc_type='doc',
    scroll='20m',
    size=500,
    body={'query':{'match_all':{}}}
)

sid = res['_scroll_id']
size = res['hits']['total']['value']
while (size > 0):
    res = es.scroll(scroll_id= sid, scroll='20m')
    sid = res['_scroll_id']
    size = len(res['hits']['hits'])
    idx = 0
    for doc in res['hits']['hits']:
        print(doc['_source'])
        if idx == 5:
            break
        idx += 1
  • 먼저 search() 에서 size는 500개로 설정한다.
  • _scroll_id 는 scroll 호출 시 찌정한다.
  • while 반복문을 이용하여 스크롤링을 시작한다. 이 때, 이 루프는 size가 0보다 큰 동안만 반복하며, size가 0이면 더 이상의 데이터가 없다는 것을 의미 한다.
  • sid와 size는 데이터를 더 가져오기 위해 새로운 스크롤 ID와 크기를 저장하는 것을 말한다.
    • for 반복문 내의 if 조건문은 삭제 후 실행하면 모든 데이터가 나타나게 된다.
  • 이제 파일을 실행해본다.
(venv) $ python3 step04_6_elasticsearch_scroll.py 
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.14/security-minimal-setup.html to enable security.
  warnings.warn(message, category=ElasticsearchWarning)
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: [types removal] Specifying types in search requests is deprecated.
  warnings.warn(message, category=ElasticsearchWarning)
{'name': 'Roberta Garrett', 'street': '622 Mullen Run', 'city': 'Johnbury', 'zip': '99188'}
{'name': 'Dr. Gregory Pitts', 'street': '2713 Anne Bridge', 'city': 'Tracychester', 'zip': '15536'}
{'name': 'Parker Rodriguez', 'street': '99259 Michelle Islands Suite 573', 'city': 'Grayview', 'zip': '94752'}
{'name': 'James Key', 'street': '6371 Claudia Neck', 'city': 'Zoetown', 'zip': '56028'}
{'name': 'John Manning', 'street': '22124 White Motorway', 'city': 'West John', 'zip': '79730'}
{'name': 'Sara Davidson', 'street': '339 Brian Landing', 'city': 'Larsonfurt', 'zip': '83689'}
{'name': 'Michael Sanders', 'street': '98018 Smith Trafficway Apt. 221', 'city': 'North Lisa', 'zip': '16332'}
{'name': 'Diana Schultz', 'street': '1902 Kristin Dale Apt. 983', 'city': 'Harveybury', 'zip': '77036'}
{'name': 'Amber Pratt', 'street': '40199 Harry Brooks Suite 081', 'city': 'Sharonmouth', 'zip': '08817'}
{'name': 'Michelle Cook', 'street': '24013 Alvarado Rest', 'city': 'Evanside', 'zip': '26897'}
{'name': 'Carmen Spears', 'street': '02924 Pearson Turnpike', 'city': 'West Heathermouth', 'zip': '20586'}
{'name': 'Kimberly Baker', 'street': '2997 Sloan Parkway Apt. 846', 'city': 'Tracyshire', 'zip': '04234'}
{'name': 'James Wilkinson', 'street': '063 Mark Mills Suite 670', 'city': 'Lewisburgh', 'zip': '69009'}
{'name': 'Valerie Fitzpatrick', 'street': '48374 Smith Throughway', 'city': 'Gonzalezhaven', 'zip': '86867'}
{'name': 'Ricardo Kim', 'street': '40378 Riley Trafficway', 'city': 'Lake Jared', 'zip': '05818'}
{'name': 'Charles Walker', 'street': '038 Reid Centers', 'city': 'North Travis', 'zip': '44111'}
{'name': 'Suzanne Mendoza', 'street': '36816 Owens Streets Suite 897', 'city': 'Elizabethhaven', 'zip': '26540'}
{'name': 'Adam Hamilton', 'street': '19219 Davis Falls', 'city': 'New Ryan', 'zip': '43903'}

References

파이썬과 엘라스틱서치 DB 연동

강의 홍보

개요

  • NoSQL 데이터베이스 시스템의 하나인 Elasticsearch 를 다루는 방법을 설명한다.
  • NoSQL 은 데이터를 행들과 열들로 저장하지 않는 데이터베이스를 말한다.
  • 대개 JSON문서 형태로 저장하고, SQL이 아닌 절의 언어를 주로 사용한다.

설치

  • 먼저 설치를 진행한다.
(venv) $ pip3 install elasticsearch
Collecting elasticsearch
  Downloading elasticsearch-7.14.1-py2.py3-none-any.whl (363 kB)
     |████████████████████████████████| 363 kB 5.7 MB/s 
Requirement already satisfied: certifi in /Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages (from elasticsearch) (2021.5.30)
Requirement already satisfied: urllib3<2,>=1.21.1 in /Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages (from elasticsearch) (1.26.6)
Installing collected packages: elasticsearch
Successfully installed elasticsearch-7.14.1
  • 설치 이후에는 현재 버전을 확인하기 위해 아래와 같이 .py 형태로 작성 후, 터미널에서 실행한다.
import elasticsearch
print(elasticsearch.__version__)
(venv) $ python3 step04_1_elasticsearch.py 
(7, 14, 1)
  • 버전까지 확인했다면, 파이썬에서 Elasticsearch 을 실행할 수 있다.

실습 1 - 데이터 삽입

  • 데이터를 Elasticsearch 에 삽입하는 소스코드를 생성해본다.
import elasticsearch
from elasticsearch import Elasticsearch
from faker import Faker

fake = Faker()

print(elasticsearch.__version__)
es = Elasticsearch()

# 데이터 생성 및 삽입
doc={"name": fake.name(),"street": fake.street_address(), "city": fake.city(),"zip":fake.zipcode()}
res=es.index(index="users",doc_type="doc",body=doc)
print(res['result']) # created
(venv) $ python3 step04_1_elasticsearch.py 
(7, 14, 1)
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.14/security-minimal-setup.html to enable security.
  warnings.warn(message, category=ElasticsearchWarning)
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: [types removal] Specifying types in document index requests is deprecated, use the typeless endpoints instead (/{index}/_doc/{id}, /{index}/_doc, or /{index}/_create/{id}).
  warnings.warn(message, category=ElasticsearchWarning)
created

실습 2 - 다수의 문서 삽입

  • 각 데이터 하나를 생성 후, 입력하는 방식이 아닌, 대량의 문서를 모두 만든 후, 한번엔 데이터를 추가하는 이른바 bulk 를 진행하도록 한다.
  • 우선 전체적인 코드를 추가한다 (파일명: step04_2_elasticsearch_bulk.py).
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from faker import Faker

fake=Faker()
es = Elasticsearch() #or pi {127.0.0.1}

actions = [
  {
    "_index": "users",
    "_type": "doc",
    "_source": {
	"name": fake.name(),
	"street": fake.street_address(), 
	"city": fake.city(),
	"zip":fake.zipcode()}
  }
  for x in range(998) # or for i,r in df.iterrows()
]

response = helpers.bulk(es, actions)
print(response)
  • 색인 이름과 형식을 "_index""_type" 으로 지정해야 한다.
  • "_source" 필드가 NoSQL 에 추가할 JSON 문서이다.
  • 반복문은 해당 문서를 999(색인이 0부터 998까지 반복)개의 데이터를 만든다는 뜻이다.
  • 마지막으로 bulk 를 실행하면 삽입이 진행된다.
(venv) $ python3 step04_2_elasticsearch_bulk.py 
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.14/security-minimal-setup.html to enable security.
  warnings.warn(message, category=ElasticsearchWarning)
/Users/evan/Desktop/data_engineering_python/venv/lib/python3.8/site-packages/elasticsearch/connection/base.py:208: ElasticsearchWarning: [types removal] Specifying types in bulk requests is deprecated.
  warnings.warn(message, category=ElasticsearchWarning)
(998, [])

실습 3 - 키바나 확인

  • 쿼리는 정상적으로 작동이 되었다. 실제 데이터가 잘 들어갔는지 확인하도록 한다. 이 때, 반드시 ElasticsearchKibana 가 먼저 구동이 되고 있어야 한다.
  • 아래 그림과 같이 왼쪽 상단 햄버거 메뉴 - 하단 Management 섹션의 Stack Management 를 클릭한다.

kibana_01.png

파이썬과 PostgreSQL DB 연동 예제

강의 홍보

개요

  • MacOS에서의 기본 설치 과정은 생략하도록 한다.
  • 새로운 DB를 생성하도록 한다.
  • 먼저 환경변수를 설정한다.
(venv) $ export PATH=/opt/homebrew/bin:$PATH:/Applications/Postgres.app/Contents/Versions/13/bin
  • 먼저 기본 데이터베이스에 연결한다.
(venv) $ psql postgres
psql (13.4)
Type "help" for help.

postgres=#
  • 위 상태에서 패스워드를 설정한다.
  • 필자는 evan을 패스워드로 입력하였다.
postgres=# ALTER USER postgres with PASSWORD 'evan';
ALTER ROLE
  • 그 후, 다시 Shell로 돌아온 뒤 python_dataengineering 데이터베이스를 생성한다.
    • 아래 코드는 Mac환경설정이 끝난 상황에서의 명령어이기 때문에 다를 수 있다. 또한, 다른 OS에서도 다를 수 있음을 유의한다.
    • 참고자료: 1.3. Creating Database
postgres=# \q
(venv) $ createdb python_dataengineering
(venv) $
  • 이번에는 pgAdmin에 접속한다.
  • 최초 접속 후, 아래와 같은 화면에서 PostgreSQL 서버를 추가하도록 한다.
  • 상단 대시보드 - 화면 중간 Add New Server 를 차례대로 클릭하면 입력해야 할 상자가 나타난다.
  • General 메뉴에서는 식별자 이름을 추가하고, Connection에서는 아래와 같이 입력을 한다.

/img/programming/2021/09/python_postgresql/python_postgresql

Apache Airflow를 활용한 CSV에서 JSON으로 변환하기

강의 홍보

개요

  • Apache Airflow에서 가장 중요한 개념은 DAG(Directed Acyclic Graph)이다.
  • DAG를 만들 시, Bash 스크립트 및 연산자(Operator)로 작업을 정의할 수 있다.
  • 이 때, 파이썬 함수로 조직화 한다.
  • Airflow 설치방법을 모른다면 다음 페이지에서 확인한다.

Step 01. Building a CSV to a JSON data pipeline

  • 우선 전체적인 코드를 확인한다.
  • 필자의 airflow 버전은 2.13이다.
  • 이 때, 몇몇 코드는 아래와 같이 발견될 수도 있다.
from airflow.operators.~~python_operator~~ import PythonOperator
## This module is deprecated. Please use `airflow.operators.python`.
  • 그럴 경우, 해당 명령어대로 수정하기를 바란다.
  • 위 부분이 교재와 코드 일부가 조금 다르며, 그 때마다 수정한다. 따라서, 교재를 보더라도 실제로는 아래 코드를 적용하거나, 아니면 airflow 1.x 버전으로 맞춰서 진행하기를 바란다.
import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd

def csvToJson():
    df = pd.read_csv('dags/data.csv')
    for i, r in df.iterrows():
        print(r['name'])
    df.to_json('dags/fromAirflow.json', orient='records')

default_args = {
    'owner': 'evan',
    'start_date': dt.datetime(2020, 3, 18),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('MyCSVDAG',
         default_args=default_args,
         schedule_interval=timedelta(minutes=5),  # '0 * * * *',
         ) as dag:
    print_starting = BashOperator(task_id='starting',
                                  bash_command='echo "I am reading the CSV now....."')

    csvJson = PythonOperator(task_id='convertCSVtoJson',
                             python_callable=csvToJson)

print_starting >> csvJson
  • 위 코드에서 바꿔야 하는 것은 owner : evan 여기 부분만 변경하시면 된다.

Step 02. 소스코드 실행방법

  • 소스코드를 실행하려면 특정 폴더에 소스 파일과 그리고 csv 파일을 별도 폴더로 지정해줘야 한다.
  • 필자의 경우 파이썬 파일명은 ch02_airflow.py 이다.
  • 우선 Airflow 경로를 확인한다. (여기서 헤매면 계속 헤매이게 된다!)
(venv) $ echo $AIRFLOW_HOME
/Users/evan/Desktop/data_engineering_python/install_files/airflow
  • 이 때, airflow.cfg 파일에서 dags_folder 속성을 확인해본다.
  • 필자는 아래와 같이 지정이 되어 있다.
.
.
dags_folder = /Users/evan/Desktop/data_engineering_python/install_files/airflow/dags
.
.
  • 이 경로의 의미는 ch02_airflow.pydata.csv 파일이 해당 폴더 dags 안에 있어야 한다는 것이다.
  • 만약 dags 폴더가 없다면 새로 만들고, 두개의 파일을 추가하도록 한다.
  • 다시 말하지만, 대부분의 경우가 소스 파일 및 데이터 파일의 경로 문제가 대다수 에러의 원인임을 기억한다.
  • 이제 이 상태에서 아래 코드를 순차적으로 실행한다. 이 때, 터미널을 두개를 열어서 독립적으로 실행한다.
(venv) $ airflow webserver
(venv) $ airflow scheduler
  • 만약 접속이 안되거나, 그 외 대부분의 문제는 AIRFLOW 경로가 제대로 설정이 안된 경우이기 때문에, 그럴 경우에는 무조건 $AIRFLOW_HOME 설정을 다시 하도록 한다.
  • 설정 방법은 아래 코드를 참조한다.
(venv) $ export AIRFLOW_HOME="$(pwd)"
(venv) $ echo $AIRFLOW_HOME

Step 03. GUI에서 확인

  • http://0.0.0.0:8080/ 에 접속한 후, 로그인을 진행한다.
  • 그러면 아래와 같은 화면이 나타나면 정상적으로 실행이 된 것이다.

airflow_01.png

파이썬을 활용한 JSON 파일 입출력 예제 with faker

강의 홍보

개요

  • JSON은 (JavaScript Object Notataion)의 약자이며, 주로 API 호출 시에 사용한다.
  • JSON 데이터를 개별적인 파일 형태로 저장하기도 한다.
  • json 라이브러리를 활용하여 입출력을 진행하고, pandas 라이브러리를 통해서도 직접 불러오도록 한다.

JSON 파일 쓰기 전체 코드

from faker import Faker
import json
output=open('data.json','w')
fake=Faker()
all_df={}
all_df['records']=[]
for x in range(1000):
	data={"name":fake.name(),
          "age":fake.random_int(min=18, max=80, step=1),
          "street":fake.street_address(),
          "city":fake.city(),
          "state":fake.state(),
          "zip":fake.zipcode(),
          "lng":float(fake.longitude()),
          "lat":float(fake.latitude())}
	all_df['records'].append(data)	
json.dump(all_df,output)
  • faker 라이브러리 설명은 파이썬을 활용한 파일 입출력 예제 with faker 에서 확인한다.
  • all_df는 빈(empty) 딕셔너리 형태로 저장한다.
  • all_df["records"] 라는 이름의 key 값에 각 레코드들의 배열을 담도록 하였다.
  • json.dump 로 표현하도록 한다.
  • 이제 파일을 실행하여 json을 내보낸뒤 결과물을 확인하도록 한다.
(venv) $ python3 ch01_writejson.py
(venv) $ ls
... data.json
  • data.json 파일을 열어서 확인하면 아래와 같이 깔끔하게 정렬된 json 데이터를 확인할 수 있을 것이다.

writejson_01.png

파이썬을 활용한 CSV 파일 입출력 예제 with faker

강의 홍보

사전 작업

  • 우선 임시 데이터를 기록할 라이브러리인 faker 를 설치한다.
  • 흔히 쓰이는 필드들을 함수 하나로 쉽게 만들 수 있도록 지원한다.
(venv) $ pip3 install faker

데이터 생성하기 전체 코드

  • 필자는 [writecsv.py](http://writecsv.py) 형태로 저장하였다.
  • 먼저 한줄 씩 설명하면 다음과 같다.
from faker import Faker
import csv
output=open('mydata.csv', mode = 'w')
fake=Faker()
header=['name','age','street','city','state','zip','lng','lat']
mywriter=csv.writer(output)
mywriter.writerow(header)
for r in range(1000):
    mywriter.writerow([fake.name(),fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(),fake.state(),fake.zipcode(),fake.longitude(),fake.latitude()])
output.close()
  • from faker import Faker 는 라이브러리를 불러오는 것이다.
  • import csv.csv 파일로 만들어주는 함수를 제공한다.
  • 이제 하나씩 살펴보도록 한다.
output=open('mydata.csv','w')
  • 파일을 쓰기 모드로 연다는 뜻이며, w 파일 쓰기 모드로 지정한다.
  • 읽기 전용 모드로 사용하려면 r ,
  • 기존 내용에 덧붙여 쓴다면, a,
  • 읽기와 쓰기가 모두 가능한 파일 모드는 r+ ,
  • 마지막으로 텍스트가 아닌 파일을 다룰 때는 b 붙인다. 예를 들면, 데이터를 바이트 단위로 기록하려면 mode = "wb" 로 지정한다.

fake = Faker()
  • fakeFaker() 클래스의 객체를 생성했다는 것을 의미한다.

header=['name','age','street','city','state','zip','lng','lat']
  • header라는 리스트 객체를 만들고, 그 안에 각종 데이터가 들어갈 컬럼명을 추가했다.

mywriter=csv.writer(output)
mywriter.writerow(header)
for r in range(1000):
    mywriter.writerow([fake.name(),fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(),fake.state(),fake.zipcode(),fake.longitude(),fake.latitude()])
output.close()
  • csv.writer 객체를 생성하여 mywriter로 지정한다.
  • 그리고, header를 writerow()로 추가하면, 일종의 컬럼값이 우선 만들어진다.
  • 그 후에, 데이터 행을 파일에 기록하는 코드를 fake 내 함수를 반복문으로 활용하여 만든 것이다.
  • 모든 작업이 완료가 되었으면 output.close() 로 종료 한다.

(venv) $ python3 ch01_loadcsv.py
(venv) $ ls
ch01_loadcsv.py data.csv
  • 파일을 실행하면 위와 같이 데이터가 만들어진 것을 볼 수 있다.
  • 실제 아래와 같이 나온 것을 확인할 수 있다.

loadcsv_01.png

Kibana Installation

강의 홍보

개요

  • Elastic Search는 GUI를 제공하지 않고 API만 제공한다. 따라서, 시각화 도구인 키바나를 GUI로 사용하도록 하는 것이 특징이다.
  • 즉, 다시 말하면 Elastic Search 는 API 데이터만 제공할 뿐이고, 이를 가시적으로 보여주기 위해서는 Kibana를 설치해야 한다는 뜻이다.

설치

(venv) $ curl -O https://artifacts.elastic.co/downloads/kibana/kibana-7.14.1-darwin-x86_64.tar.gz
(venv) $ curl https://artifacts.elastic.co/downloads/kibana/kibana-7.14.1-darwin-x86_64.tar.gz.sha512 | shasum -a 512 -c -
(venv) $ tar -xzf kibana-7.14.1-darwin-x86_64.tar.gz
(venv) $ cd kibana-7.14.1-darwin-x86_64/

실행

  • 설치가 종료가 된 이후에는 실행을 한다.
  • 주의 : 이 때 주의해야 하는 것은 elastic search 가 먼저 실행이 되어 있어야 한다는 점이다.
  • http://localhost:5601 포트를 열도록 한다.
(venv) $ ./bin/kibana

/img/programming/2021/09/kibana_install/kibana_install

Elastic Search Engine Installation

강의 홍보

개요

설치

(venv) $ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.14.1-darwin-x86_64.tar.gz
(venv) $ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.14.1-darwin-x86_64.tar.gz.sha512
(venv) $ shasum -a 512 -c elasticsearch-7.14.1-darwin-x86_64.tar.gz.sha512 
(venv) $ tar -xzf elasticsearch-7.14.1-darwin-x86_64.tar.gz
(venv) $ cd elasticsearch-7.14.1/
(venv) $ ls
LICENSE.txt     NOTICE.txt      README.asciidoc bin             config          jdk.app         lib             logs            modules         plugins
  • 현재 경로에서 config/elasticsearch.yml 파일을 열고 노드와 클러스터 이름을 지정해보자.
# Use a descriptive name for your cluster:
#
cluster.name: dataEngineeringWithPython
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: OnlyMode
  • 이제 준비가 끝났다면, 다음 명령을 실행하여 일래스틱 서치를 진행해본다.
  • 사전에 Java가 설치가 되어 있어야 한다. 만약 설치가 안 되어 있다면, Apache NiFi Installation에서 설치과정을 확인한다.
(venv) $ ./bin/elasticsearch
warning: usage of JAVA_HOME is deprecated, use ES_JAVA_HOME
warning: usage of JAVA_HOME is deprecated, use ES_JAVA_HOME
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
[2021-09-07T11:09:50,528][INFO ][o.e.n.Node               ] [OnlyMode] version[7.14.1], pid[14599], build[default/tar/66b55ebfa59c92c15db3f69a335d500018b3331e/2021-08-26T09:01:05.390870785Z], OS[Mac OS X/11.4/x86_64], JVM[AdoptOpenJDK/OpenJDK 64-Bit Server VM/11.0.11/11.0.11+9]
.
.
.
[2021-09-07T11:10:20,876][INFO ][o.e.i.g.DatabaseRegistry ] [OnlyMode] database file changed [/var/folders/zq/ch7gky6n3rzgjf1pd0w2l35w0000gn/T/elasticsearch-1663630215408415345/geoip-databases/18vlOg1KR7q3JLo9G5S8SA/GeoLite2-City.mmdb], reload database...
  • 이제 http://localhost:9200을 열어본다.
  • 이 책에서 사용할 NoSQL DB가 준비가 되었다는 뜻을 의미한다.

elastic.png

Apache Airflow Installation

강의 홍보

개요

  • NiFi와 같은 용도의 소프트웨어이며, 현재 가장 인기 있는 오픈소스 데이터 파이프라인 도구라고 할 수 있다.
  • 보통은 시스템에 경로를 설정한다. 그런데, 본 장에서는 가상환경 설정 후 진행하는 것으로 했다.
  • 가상 환경은 virtualenv 를 통해서 진행한다.
  • 그 후에 가상 환경에 접속한다.
$ source venv/bin/activate
(venv) $

Step 01. 환경변수 설정

  • 우선 pip 으로 설치 하기에 앞서서 환경 변수를 임시로 설정한다.
  • 해당 환경 변수가 설정된 곳으로 airflow 설치 관련 폴더 및 파일들이 다운로드 될 것이다.
(venv) $ export AIRFLOW_HOME="$(pwd)"
(venv) $ echo $AIRFLOW_HOME
/Users/evan/Desktop/data_engineering_python/install_files/airflow

Step 02. 라이브러리 설치

  • 이제 airflow 설치를 진행한다.
  • 이때, 설치 명령어에 따른 옵션은 아래 그림에서 살펴보기를 바란다.

airflow_01.png

Apache NiFi Installation

강의 홍보

개요

  • 데이터 엔지니어링에 필요한 기본적인 인프라를 설치 진행하는 튜토리얼을 만들었다.
  • 기본적으로 교재에 충실하지만, 약 1년전에 쓰인 책이라, 최신 버전으로 업그레이드 하였다.

Apache NiFi 설치과정

  • 먼저 웹사이트에 방문하여 필요한 파일을 다운로드 받는다.
  1. wget을 이용해서 NiFi를 현재 디렉터리에 내려받는다.
$ wget https://downloads.apache.org/nifi/1.14.0/nifi-1.14.0-bin.tar.gz
--2021-09-06 13:10:55--  https://downloads.apache.org/nifi/1.14.0/nifi-1.14.0-bin.tar.gz
Resolving downloads.apache.org (downloads.apache.org)... 135.181.214.104, 135.181.209.10, 88.99.95.219
Connecting to downloads.apache.org (downloads.apache.org)|135.181.214.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1417663663 (1.3G) [application/x-gzip]
Saving to: ‘nifi-1.14.0-bin.tar.gz’

nifi-1.14.0-bin.tar.gz                      100%[==========================================================================================>]   1.32G  5.27MB/s    in 4m 13s
  1. .tar.gz 파일의 압축을 푼다.
$ tar -xvf nifi-1.14.0-bin.tar.gz
$ ls
nifi-1.14.0             nifi-1.14.0-bin.tar.gz
  1. nifi-1.14.0 의 디렉터리가 생겼을 것이며, 해당 디렉터리로 가서 다음 명령어를 실행한다.
$ cd nifi-1.14.0
$ bin/nifi.sh start
nifi.sh: JAVA_HOME not set; results may vary

Java home: 
NiFi home: /Users/evan/Desktop/data_engineering_python/install_files/nifi-1.14.0

Bootstrap Config File: /Users/evan/Desktop/data_engineering_python/install_files/nifi-1.14.0/conf/bootstrap.conf

The operation couldn’t be completed. Unable to locate a Java Runtime.
Please visit http://www.java.com for information on installing Java.
  • 자바가 이미 설치가 되어 있다면 정상적으로 실행이 된다.
  • 그러나, 자바가 설치가 되어 있지 않다면 위 에러와 같이 별도로 자바 환경 설치를 해야 한다.

3.1 자바 설치 및 환경변수 지정