建立自己的 COVID-19 儀表板,透過 Google Cloud 搞定整條資料管線

延續之前的文章,這篇主要是記錄我怎麼透過 Google Cloud 上面的服務來組合出一條資料管線,把 COVID-19 移動趨勢報告與確診數做關聯,打造一個可以每天自動更新的武漢肺炎 COVID-19 儀表板。

還沒收看過前一篇文章的朋友們,可以先點擊下方連結看一下 Google 與 Apple 的移動趨勢報告分析,會對這份資料與儀表板的建置有更深入的了解。

傳送門:分析 Google 與 Apple 的移動趨勢報告:COVID-19 Community Mobility Reports

使用省錢的方法建置

想要定時更新資料,開一台虛擬機,寫一個排程任務(Cron job)也是做得到,為什麼要大費周章搞一條資料管線呢?

省錢!省錢!省錢!

使用雲端服務必須要精打細算,一台虛擬機器開在那邊,要燒很多錢呀,我們平常老百姓承受不起。

對於一個小專案來說,功能簡單,沒有 migrate 的限制,這種定時做某件事情的工作,不用虛擬機器,直接使用 Cloud Function 就可以了。

當然,除了省錢之外,這次也是要練習使用這些雲端功能,想說之後需要用到的時候比才知道怎麼接 😎

建立資料管線

整條資料管線滿簡單的,只要在 GCP Console 上面操作就可以完成。

我原本還想說要來寫個腳本,做到 Infrastructure as Code,但想說只有一個 Python 程式在執行,直接上去手動更新檔案就好了,懶惰 ing 😂

整條水管長這樣。透過 Cloud Scheduler 發起事件,觸發 Cloud Function 執行資料下載與合併的工作,之後把結果丟到 Cloud Storage,最後在 Data Studio 顯示結果。

為了畫出這張圖,還特別 Google 一下,找到這個用於製作 GCP 架構圖的官方圖示集:架構圖解決方案圖示,可惜裡面沒有 Cloud Scheduler,要自己匯入。

covid19 gcp pipeline - 建立自己的 COVID-19 儀表板,透過 Google Cloud 搞定整條資料管線
使用到的 Google 服務們,按順序排排站

水管包含這幾個項目:

  1. Cloud Scheduler 每天早上會發起事件,發出一個事件給 Cloud Pub/Sub
  2. Cloud Pub/Sub 會觸發訂閱中的 Cloud Function
  3. Cloud Function 會執行一隻 Python 程式,處理資料下載與合併的工作
  4. Cloud Function 會將合併好的資料儲存到 Cloud Storage
  5. Data Studio 可以看到最新的資料

這篇文章都是參考這篇教學一步步完成的,如果你也想要打造資料自己的資料管線,歡迎參考:

教學文件:Using Pub/Sub to trigger a Cloud Function

1. Cloud Scheduler

Cloud Scheduler 是一個簡單的排程管理器,透過 crontab 的格式設定甚麼時候要觸發事件。稍微要注意自己建立的時區是甚麼,以免觸發時間搞錯。

covid19 gcp pipeline1 - 建立自己的 COVID-19 儀表板,透過 Google Cloud 搞定整條資料管線
Cloud Scheduler 的設定頁面

2. Cloud Pub/Sub

Cloud Pub/Sub 是用來處理發布與訂閱這種非同步事件的服務。以日常生活為例子,可以想像你訂閱追蹤某個 YouTube 頻道,等到那個頻道有新影片上架的時候,你就會收到頻道主動推播的訊息。

簡單的說,這裡就是要讓 Cloud Function 去訂閱 Cloud Scheduler。

covid19 gcp pipeline4 - 建立自己的 COVID-19 儀表板,透過 Google Cloud 搞定整條資料管線
Cloud Pub/Sub 的設定頁面

3. Cloud Function

Cloud Function 就是實際工作的主角,我們把寫好的 Python 程式放上去,讓他工作。

只要先把前面的步驟串起來,就可以去 Cloud Scheduler 那邊點「立即執行」來觸發 Cloud Function 測試自己的程式是不是正常運作。當然,也可以直接用 Cloud Function 本身自帶的測試功能,相關的除錯訊息也都看的到。

covid19 gcp pipeline3 - 建立自己的 COVID-19 儀表板,透過 Google Cloud 搞定整條資料管線
Cloud Function 的設定頁面

前面提到,我們要把人流資料與確診數做關聯,因此這邊會分別向兩個資料來源下載 CSV 檔案,然後根據國家來做合併。兩個資料來源分別是:

資料下載與合併 程式碼

因為程式碼也沒有很多,就直接在這邊分享給大家。這邊有幾個重點稍微說明一下:

  • 在資料合併時我做了簡化,只根據國家做對應,所以每個國家內的所有區域資料將會全部合併,以至於目前的篩選器最多只能塞選國家,沒辦法篩選區域。
  • 兩份資料的日期格式與國家名稱有些許不同,我做了一個簡單對應。

requirements.txt

pandas==1.0.3
numpy==1.18.4
requests==2.23.0
google-cloud-storage==1.28.1

main.py

from google.cloud import storage
import pandas as pd
import numpy as np
import datetime
import requests
import csv
import json
import os

mobility_report_url = 'https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv'
confirmed_global_url = 'https://data.humdata.org/hxlproxy/api/data-preview.csv?url=https%3A%2F%2Fraw.githubusercontent.com%2FCSSEGISandData%2FCOVID-19%2Fmaster%2Fcsse_covid_19_data%2Fcsse_covid_19_time_series%2Ftime_series_covid19_confirmed_global.csv&filename=time_series_covid19_confirmed_global.csv'
deaths_global_url = 'https://data.humdata.org/hxlproxy/api/data-preview.csv?url=https%3A%2F%2Fraw.githubusercontent.com%2FCSSEGISandData%2FCOVID-19%2Fmaster%2Fcsse_covid_19_data%2Fcsse_covid_19_time_series%2Ftime_series_covid19_deaths_global.csv&filename=time_series_covid19_deaths_global.csv'
recovered_global_url = 'https://data.humdata.org/hxlproxy/api/data-preview.csv?url=https%3A%2F%2Fraw.githubusercontent.com%2FCSSEGISandData%2FCOVID-19%2Fmaster%2Fcsse_covid_19_data%2Fcsse_covid_19_time_series%2Ftime_series_covid19_recovered_global.csv&filename=time_series_covid19_recovered_global.csv'
output_path = "/tmp/output/output.csv"

def merge_global_data_row(data_input):
    aggregation_functions = {'Country/Region': 'first'}
    for column in data_input.columns[4:]:
        aggregation_functions[column] = 'sum'
    data_output = data_input.groupby(data_input['Country/Region']).aggregate(aggregation_functions)
    return data_output

def merge_mobility_report_row(data_input):
    aggregation_functions = {'country_region': 'first', 'date': 'first'}
    for column in data_input.columns[5:]:
        aggregation_functions[column] = 'sum'
    data_output = data_input.groupby(['country_region', 'date']).aggregate(aggregation_functions)
    return data_output

def load_csv_from_url(url):
    df = pd.read_csv(url)
    return df

def download_csv(url, filename):
    df = pd.read_csv(url)
    df.to_csv(filename, index=False)

def convert_date_format(date_list):
# convert the date format from %m/%d/%y to %Y/%m/%d
    output = []
    for item in date_list:
        datetime_obj = datetime.datetime.strptime(item, '%m/%d/%y')
        output.append(datetime_obj.strftime('%Y/%m/%d'))
    return output

def mapping_contry_name(name):
    if name == 'Taiwan':
        return 'Taiwan*'
    elif name == 'United States':
        return 'US'
    elif name == 'South Korea':
        return 'Korea, South'
    else:
        return name

def process(event, context):

    # load files
    data_mobility_report = load_csv_from_url(mobility_report_url)
    data_confirmed = merge_global_data_row(load_csv_from_url(confirmed_global_url))
    data_deaths =  merge_global_data_row(load_csv_from_url(deaths_global_url))
    data_recovered =  merge_global_data_row(load_csv_from_url(recovered_global_url))

    # merge data
    data_mobility_report_group = merge_mobility_report_row(data_mobility_report)
    data_mobility_report_group = data_mobility_report_group.reset_index(drop=True)
    case_data_contry_list = data_confirmed['Country/Region'].tolist()
    data_output = None

    for contry_name, row in data_mobility_report_group.groupby(['country_region']):
        mname = mapping_contry_name(contry_name)

        total = data_confirmed.iloc[:,25:].shape[1]
        diff = total - row.shape[0]

        table = np.zeros((diff, row.shape[1]))
        df = pd.DataFrame(table, dtype=np.int, columns=row.columns)
        df['country_region'] = contry_name
        df['date'] = convert_date_format(list(data_confirmed.columns[-diff:]))
        df2 = pd.concat([row, df], axis=0)

        if mname in case_data_contry_list:
            data_confirmed_pick = data_confirmed.loc[data_confirmed['Country/Region'] == mname].iloc[:,25:].T[mname].values
            data_deaths_pick = data_deaths.loc[data_confirmed['Country/Region'] == mname].iloc[:,25:].T[mname].values
            data_recovered_pick = data_recovered.loc[data_confirmed['Country/Region'] == mname].iloc[:,25:].T[mname].values
            print(contry_name, row.shape, data_confirmed_pick.shape)
        else:
            data_confirmed_pick = np.zeros(total)
            data_deaths_pick = np.zeros(total)
            data_recovered_pick = np.zeros(total)

        df2['confirm_count'] = data_confirmed_pick
        df2['deaths_count'] = data_deaths_pick
        df2['recovered_count'] = data_recovered_pick
        if data_output is None:
            data_output = df2
        else:
            data_output = data_output.append(df2)

    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    data_output.to_csv(output_path, index = False, header=True)

    client = storage.Client()
    bucket = client.get_bucket('bucket-name')
    blob = bucket.blob('output.csv')
    blob.upload_from_filename(filename=output_path)

4. Cloud Storage

這個就是儲存檔案的空間,開設一個自己的 bucket。

題外話,最近聽到 AWS 的課程在教 S3,中文的教學是直接把 bucket 翻譯成「桶」哈哈,從這個私有桶到公有桶甚麼的,聽起來怪饒口的。

covid19 gcp pipeline2 - 建立自己的 COVID-19 儀表板,透過 Google Cloud 搞定整條資料管線
Cloud Storage 的設定頁面

5. Google Data Studio

最後一步,就是設定 Data Studio 去吃 Cloud Storage 來源的資料,設定完成拖拉幾下就可以建置好報表。關於 Data Studio 的介紹,可以參考我之前的文章:

使用 Google Data Studio 數據分析工具,輕鬆打造 Google Analytics 視覺化報表

最後的成果就如下圖所示,有興趣的話,可以點擊下方的連結前往觀看。如果資料有誤,歡迎在留言區告訴我,我會盡快修補問題。

傳送門:COVID-19 移動趨勢報表 (已下架)
covid19 data1 - 建立自己的 COVID-19 儀表板,透過 Google Cloud 搞定整條資料管線
選擇國家與地區能夠調整資料範圍

這樣花費多少呢?

串了那麼多服務,要花到錢嗎?答案是:不用啦!

我們來盤點一下,目前有用到的服務們的免費額度有多少,詳細還是要參考 GCP 官方的文件。

  • Cloud Scheduler:每個 Google 帳單帳戶每個月可免費建立 3 項工作。
  • Cloud Pub/Sub:系統會根據擷取和傳送訊息作業每月所傳輸的資料量計費,前 10 GB 的用量免費。
  • Cloud Function:每月前 200 萬次叫用免費。免費方案也提供 400,000 GB/秒和 200,000 GHz/秒的運算時間,以及每月 5 GB 的網際網路輸出流量。
  • Cloud Storage:每月 5 GB 的免費用量。因為每天只做一次事情,A 與 B 級作業的額度都不會超過。
  • Google Data Studio:不用錢 YA

總結

很開心完成了一個簡單的 Side Project,也對於 GCP 上面的這幾個服務有更深入的了解。

不過跟 AWS 相比之下,Google Cloud 的功能顯得實在陽春許多,還要加把勁才能追上老大哥 TAT

真的要說的話,我還是比較喜歡 Google Data Studio,跟打對台的 Amazon QuickSight 相比,Google 的 UI 呈現還是深得我心,做出來的成品看的愉悅啊 XDD

Jerry
Jerry

樂於分享的軟體工程師,曾在新創與大型科技公司實習,獲得黑客松競賽冠軍,擔任資安研討會講者。長期熱衷於資訊安全、雲端服務、網路行銷等領域,希望將科技知識分享給更多人。內容轉載請來信:jlee58tw@gmail.com

6 則留言

  1. Jerry你好:
    請問function有這個error,要怎麼辦?
    TypeError: merge_global_data_row() takes 1 positional argument but 2 were given

    • Hi 家維,

      這個錯誤是參數數量不對,可以分享您完整的程式碼嗎?
      目前我簡單測試看起來沒有問題

      Regards,
      Jerry

    • Hello 小妍,

      謝謝你的回應,這篇文章當作是我的學習兼紀錄,供大家參考。
      如果程式不能動,有可能是資料來源的欄位有更新,導致爬蟲程式執行失敗。
      建議你可以先看一下錯誤訊息,我們來看看是哪邊有問題,然後進行調整。

      Regards,
      Jerry

    • Hello 學長好久不見 🙂
      GCP 真的是介面取勝哈哈
      我也是習慣用他xdd

發表回應