Ch11 Powerful SQL Pattern

Page content

1. 구글 클라우드 설정

본격적인 빅쿼리 실습에 앞서서, Python과 연동하는 예제를 준비하였다. 빅쿼리 시작에 앞서서 선행적으로 클라우드 사용을 해야 한다.

  1. 만약 GCP 프로젝트가 없다면, 계정을 연동한다. Go to Cloud Resource Manager
  2. 그리고, 비용결제를 위한 카드를 등록한다. Enable billing
  3. 마지막으로 BigQuery API를 사용해야 하기 때문에 빅쿼리 API 사용허가를 내준다.Enable BigQuery

위 API를 이용하지 않으면 Python 또는 R과 연동해서 사용할 수는 없다. 자주 쓰는것이 아니라면 비용은 거의 발생하지 않으니 염려하지 않아도 된다. 비용관리에 대한 자세한 내용은 BigQuery 권장사항: 비용 관리에서 확인하기를 바란다.

2. 사용자 계정 인증

구글 코랩을 사용해서 인증 절차를 밟도록 한다. 아래 소스코드는 변경시키지 않는다. 아래 절차대로 진행하면 된다. Gmail 인증 절차와 비슷하다.

from google.colab import auth
auth.authenticate_user()
print('Authenticated')
Authenticated

3. 쿼리 기본

빅쿼리는 기본적으로 데이터 웨어하우스의 기능을 가지고 있다. Structured(데이터프레임) & Semi-Structured(JSON)과 같은 DB를 지원하고 있다. Create, Read, Update, Delete 기능을 지원한다.

빅쿼리는 데이터 분석을 위한 기본적인 도구이기 때문에, 웹/앱을 활용한 마케팅 분석이 필요한 업종에서는 어쩌면 반드시 가져가야할 일종의 언어이기도 하다.

기본적으로 SQL문법을 지원하며, BigQuery 나름의 문법을 또한 구성하고 있다. 이제 본격적으로 코드를 작성해보자.

현재 작성하는 모든 코드는 Python-BigQuery로 연동하여 작성하려고 한다. 이제 간단하게 쿼리를 작성해보자.

개요

빅데이터에서 쿼리는 일종의 시간이다. 그리고 클라우드에서는 비용이다. 따라서 cost-effective하게 쿼리를 작성하는 것이 중요하며 또한 필요하다.

이를 위해서는 nested fields를 활용하는 것이 flattened data를 활용하는 것보다 좋다.

이 때 필요한 것이 Chapter 07-10 배운 개념들이다. 이제 실전에서 STRUCT, UNNEST, ARRAY_AGG를 활용해서 쿼리를 조회하려고 한다.

먼저 아래 Table을 확인해보자.

위 결과물을 조금 더 시각적으로 보고 싶다면, 꼭 자신의 빅쿼리 콘솔에서 확인하는 것을 권합니다.

from google.cloud import bigquery
from tabulate import tabulate

project_id = 'your_project_id'
client = bigquery.Client(project=project_id)

temp = client.query('''
  SELECT 
    sid, season, number, basin, subbasin, name, iso_time, nature, latitude, longitude
  FROM `bigquery-public-data.noaa_hurricanes.hurricanes` 
  WHERE season = "1971" and basin = "WP"
  LIMIT 4
  ''').to_dataframe()

print(tabulate(temp, tablefmt="pipe", headers="keys"))
|    | sid           |   season |   number | basin   | subbasin   | name   | iso_time                  | nature   |   latitude |   longitude |
|---:|:--------------|---------:|---------:|:--------|:-----------|:-------|:--------------------------|:---------|-----------:|------------:|
|  0 | 1971141N07156 |     1971 |       49 | WP      | MM         | DINAH  | 1971-05-29 00:00:00+00:00 | TS       |    18.9143 |     110.643 |
|  1 | 1971160N10135 |     1971 |       54 | WP      | MM         | FREDA  | 1971-06-17 00:00:00+00:00 | TS       |    20.4    |     115.586 |
|  2 | 1971189N12130 |     1971 |       66 | WP      | MM         | KIM    | 1971-07-13 00:00:00+00:00 | TS       |    18.1857 |     106.886 |
|  3 | 1971190N09141 |     1971 |       68 | WP      | MM         | JEAN   | 1971-07-15 18:00:00+00:00 | TS       |    14.4857 |     116.229 |

주요 과제

  • 여기서 하려는 것은 usa_sshs (better known as category)의 최대값을 찾는 것이다.
  • 연도는 2010년이다.
  • basin=NA는 허리케인이 처음 도착했을 때는 의미한다.
  • 원하는 결과값은 허리케인이 지도에서 (27.1, -60.1)에 있을 때, 예를 들면 Hurricane Danielle reached Category 4 at 18:00 UTC on 2010-08-01 라고 말하는 것이다.

결과 테이블을 보자.

|    | name     |   category | iso_time                  |   latitude |   longitude |
|---:|:---------|-----------:|:--------------------------|-----------:|------------:|
|  0 | DANIELLE |          4 | 2010-08-27 18:00:00+00:00 |    27.1    |    -60.1    |
|  1 | EARL     |          4 | 2010-08-30 18:00:00+00:00 |    19      |    -64.2    |
|  2 | IGOR     |          4 | 2010-09-12 18:00:00+00:00 |    17.7    |    -46.1    |
|  3 | JULIA    |          4 | 2010-09-15 09:00:00+00:00 |    17.2549 |    -31.8002 |
|  4 | KARL     |          3 | 2010-09-17 09:00:00+00:00 |    19.6874 |    -95.2465 |

즉, 이렇게 테이블을 만들어 내려고 하는 것이다. 어떻게 작성할까?

Step 1. History of Hurricane Location.

기본적으로 Hurriance의 동선을 확인해야 하는데, hurricane 이라는 가상의 테이블을 만든다.

조건은 2010년 기준이다.

방법 1. Try Group By

우선, basinseason을 기준으로 필터링을 한다.

temp = client.query('''
  SELECT 
    NAME, iso_time, latitude, longitude, usa_sshs
  FROM `bigquery-public-data.noaa_hurricanes.hurricanes` 
  WHERE season = "2010" and basin = "NA"
  LIMIT 4
  ''').to_dataframe()

print(tabulate(temp, tablefmt="pipe", headers="keys"))
|    | NAME   | iso_time                  |   latitude |   longitude |   usa_sshs |
|---:|:-------|:--------------------------|-----------:|------------:|-----------:|
|  0 | OTTO   | 2010-10-07 06:00:00+00:00 |       23.4 |       -68.3 |         -2 |
|  1 | OTTO   | 2010-10-06 12:00:00+00:00 |       22.6 |       -67.8 |         -2 |
|  2 | OTTO   | 2010-10-06 06:00:00+00:00 |       22   |       -67.2 |         -2 |
|  3 | OTTO   | 2010-10-06 18:00:00+00:00 |       23   |       -68.1 |         -2 |
  • 우리가 여기에서 원하는 것은 Each Hurricane의 위치별 리스트를 확보하는 것이다.

  • 문제는 위 쿼리에서 GROUP sid를 하게 되면 작동하지 않는다. (한번 해 보면 된다!) 이 때 필요한 것이, ARRAY_AGGSTRUCT을 사용해야 한다.

방법 2. ARRAY_AGG와 STRUCT

이번에는 Powerful한 ARRAY_AGGSTRUCT를 사용했다.

temp = client.query('''
  SELECT 
    MIN(NAME) AS name, 
    ARRAY_AGG(STRUCT(iso_time, latitude, longitude, usa_sshs) ORDER BY iso_time ASC) as track
  FROM `bigquery-public-data.noaa_hurricanes.hurricanes` 
  WHERE season = "2010" and basin = "NA"
  GROUP BY sid
  LIMIT 5
  ''').to_dataframe()

print(temp)
       name                                              track
0      OTTO  [{'iso_time': 2010-10-06 06:00:00+00:00, 'lati...
1     COLIN  [{'iso_time': 2010-08-02 12:00:00+00:00, 'lati...
2      EARL  [{'iso_time': 2010-08-24 00:00:00+00:00, 'lati...
3      IGOR  [{'iso_time': 2010-09-08 06:00:00+00:00, 'lati...
4  DANIELLE  [{'iso_time': 2010-08-21 12:00:00+00:00, 'lati...

위 쿼리에 대해서 하나씩 생각해보자.

  • 우선, sid(=storm id)별로 Group By를 해야 각 Storm 별로 정렬이 될 것이다. 그 다음 집계함수를 사용해서 원하는 값을 추출할 것이다.
  • 그런데, 우리가 원하는 것은 Storm ID당 각 셀안의 모든 행을 유지하려면 ARRAY_AGG가 필요하다.
  • 단 한개의 필드만 조회하는 것이 아닌, 시계열과 관련된 모든 필드가 같이 들어가야 하는데, 이 때 필요한 것이 STRUCT이다.
  • 그리고, time을 기준으로 정렬하면 된다. track field를 확인해보자. 그동안 배웠던 익숙한 결과물이 나타냈다. 위 with 를 사용하여 쿼리문의 결과값을 hurricanes라고 임시 저장한다.

Step 2. Maximum Category

이제 임시 저장된 hurricanes를 사용하여 우선 category field를 만들어야 한다. 이 때 UNNEST가 사용될 것이다.

temp = client.query('''
  with hurricanes AS (
    SELECT 
      MIN(NAME) AS name
      , ARRAY_AGG(STRUCT(iso_time, latitude, longitude, usa_sshs) ORDER BY iso_time ASC) as track
    FROM `bigquery-public-data.noaa_hurricanes.hurricanes` 
    WHERE season = "2010" and basin = "NA"
    GROUP BY sid
  )

  SELECT 
    name
    , track
    , (SELECT MAX(usa_sshs) FROM UNNEST(track)) AS category
  FROM 
    hurricanes
  ORDER BY category DESC
  ''').to_dataframe()

print(temp)
         name                                              track  category
0        EARL  [{'iso_time': 2010-08-24 00:00:00+00:00, 'lati...         4
1        IGOR  [{'iso_time': 2010-09-08 06:00:00+00:00, 'lati...         4
2    DANIELLE  [{'iso_time': 2010-08-21 12:00:00+00:00, 'lati...         4
3       JULIA  [{'iso_time': 2010-09-12 06:00:00+00:00, 'lati...         4
4        KARL  [{'iso_time': 2010-09-13 18:00:00+00:00, 'lati...         3
5       TOMAS  [{'iso_time': 2010-10-29 06:00:00+00:00, 'lati...         2
6       PAULA  [{'iso_time': 2010-10-11 00:00:00+00:00, 'lati...         2
7        ALEX  [{'iso_time': 2010-06-24 18:00:00+00:00, 'lati...         2
8     RICHARD  [{'iso_time': 2010-10-19 18:00:00+00:00, 'lati...         2
9        OTTO  [{'iso_time': 2010-10-06 06:00:00+00:00, 'lati...         1
10      SHARY  [{'iso_time': 2010-10-28 18:00:00+00:00, 'lati...         1
11       LISA  [{'iso_time': 2010-09-20 00:00:00+00:00, 'lati...         1
12      COLIN  [{'iso_time': 2010-08-02 12:00:00+00:00, 'lati...         0
13     NICOLE  [{'iso_time': 2010-09-28 00:00:00+00:00, 'lati...         0
14     GASTON  [{'iso_time': 2010-09-01 00:00:00+00:00, 'lati...         0
15    HERMINE  [{'iso_time': 2010-09-05 06:00:00+00:00, 'lati...         0
16      FIONA  [{'iso_time': 2010-08-30 00:00:00+00:00, 'lati...         0
17     BONNIE  [{'iso_time': 2010-07-22 06:00:00+00:00, 'lati...         0
18    MATTHEW  [{'iso_time': 2010-09-23 12:00:00+00:00, 'lati...         0
19  NOT_NAMED  [{'iso_time': 2010-08-10 06:00:00+00:00, 'lati...        -1
20  NOT_NAMED  [{'iso_time': 2010-07-07 06:00:00+00:00, 'lati...        -1
  • 매우 명확하게 category column 형태로 나온 것을 확인할 수 있다.
  • 우선 track은 배열로 구성 되어 있다.
  • UNNEST()를 사용하면 배열이 table 형태로 변환된다. 이 때, track.* 형태로 field명이 추출되는 것을 볼 수 있다. 그리고, 난 뒤, MAX(usa_sshs)를 구하는 것이 위 쿼리에 대한 설명이다.

Step 3. 최종 결과물 출력

마찬가지로, Step 2의 결과물을 다시 category_hurricanes라고 저장한뒤 재 사용하도록 한다.

temp = client.query('''
WITH hurricanes AS (
SELECT
  MIN(NAME) AS name,
  ARRAY_AGG(STRUCT(iso_time, latitude, longitude, usa_sshs) ORDER BY iso_time ASC) AS track
FROM
  `bigquery-public-data.noaa_hurricanes.hurricanes`
WHERE
  season = '2010' AND basin = 'NA'
GROUP BY
  sid
),

cat_hurricane AS (
SELECT name, track, (SELECT MAX(usa_sshs) FROM UNNEST(track))  AS category
from hurricanes
ORDER BY category DESC
)

SELECT 
  name
  , category
  , (SELECT AS STRUCT iso_time, latitude, longitude
   FROM UNNEST(track) 
   WHERE usa_sshs = category ORDER BY iso_time LIMIT 1).*
FROM cat_hurricane
ORDER BY category DESC, name ASC
LIMIT 5
  ''').to_dataframe()

print(temp)
       name  category                  iso_time  latitude  longitude
0  DANIELLE         4 2010-08-27 18:00:00+00:00   27.1000   -60.1000
1      EARL         4 2010-08-30 18:00:00+00:00   19.0000   -64.2000
2      IGOR         4 2010-09-12 18:00:00+00:00   17.7000   -46.1000
3     JULIA         4 2010-09-15 09:00:00+00:00   17.2549   -31.8002
4      KARL         3 2010-09-17 09:00:00+00:00   19.6874   -95.2465
(SELECT AS STRUCT iso_time, latitude, longitude
   FROM UNNEST(track) 
   WHERE usa_sshs = category ORDER BY iso_time LIMIT 1).*
  • 위 구문에서 .*을 사용하지 않으면 ARRAY 형태로 출력된다.
  • 만약에 LIMIT 1로 지정하지 않으면 에러가 난다.

결론

SQL에서 시계열 데이터를 다루는 것은 생각보다 쉽지 않다. 각 ID당, 초당 또는 분당 단위로 다르게 바뀌는 과정속에서 집계를 구하려면 기존의 Group BY 방식으로는 생각보다 쉽게 적용이 되지 않을 가능성이 크다.

이 때, 필요한 것이 배열(=ARRAY)을 활용하는 것이다. 차후에 더 나은 예제가 있거나 또한 발견하면 그 때 다시한번 공유하도록 한다.

작은 도움이 되기를 바란다.

Reference

Lakshmanan, V. (2018). “Exploring a powerful SQL pattern: ARRAY_AGG, STRUCT and UNNEST” Retrieved from https://www.freecodecamp.org/news/exploring-a-powerful-sql-pattern-array-agg-struct-and-unnest-b7dcc6263e36/.