[python] 使用 Spark 與 Hive 進行 ETL

之前實作 ETL 系統是透過 Python + MongoDB/MySQL 完成,對於少量的資料綽綽有餘,但如果想處理大量資料,又想要借用 Spark MLlib 機器學習套件的話,那麼就使用 PySpark + Hive 來達成任務吧。能使用熟悉的 Python 與 SQL 語法,無痛轉移。


Spark 與 Hive 環境設定

這裡透過 Google Cloud Dataproc 架設,Cluster 跑起來之後全部套件都有囉,包含 Hadoop 2.7.3,Spark 2.0.2 與 Hive 2.1.1 ,而為了要使用 PySpark,記得加上環境變數。

$ export SPARK_HOME=/usr/lib/spark
$ export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH

使用 PySpark 來進行 ETL

spark hive flow - [python] 使用 Spark 與 Hive 進行 ETL

ETL 的流程就如上圖三步驟。我們想要抓取的資料是行政院農委會提供的 農村地方美食小吃特色料理,內容是農村特色料理的所在位置與店家資訊,反正內容是什麼不重要,重要的是希望程式能直接透過 HTTP Request 抓下來,經過轉換後丟進 Hive 資料庫。 由於 Spark 2.0 版本加入了 Spark Session,使用上變得簡潔,不過想要與 Hive溝通,記得要加上 enableHiveSupport() 才能存取資料喔。另外,Dataproc 上的 Python 版本為 2.7,處理中文會比較麻煩,如果要換成 Python 3,記得 Cluster環境下只改 Master是不行的,必須要在裝環境的時候,將 所有機器上的 PySpark 預設改掉。詳細作法可以參考這篇。這裡仍然使用 Python 2.7,詳細的程式碼與說明如下:

# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
import requests
import json

class SparkHiveExample:

    def __init__(self):
        ## initialize spark session
        self.spark = SparkSession.builder.appName("Spark Hive example").enableHiveSupport().getOrCreate()

    def run(self):
        ## download with opendata API
        url = "http://data.coa.gov.tw/Service/OpenData/ODwsv/ODwsvTravelFood.aspx?"
        data = requests.get(url)

        ## convert from JSON to dataframe
        df = self.spark.createDataFrame(data.json())

        ## display schema
        df.printSchema()

        ## creates a temporary view using the DataFrame
        df.createOrReplaceTempView("travelfood")

        ## save into Hive
        self.spark.sql("DROP TABLE IF EXISTS travelfood_hive")
        df.write.saveAsTable("travelfood_hive")

        ## use SQL
        sqlDF = self.spark.sql("SELECT * FROM travelfood_hive WHERE City == '屏東縣'")
        sqlDF.select("Name", "City", "Town", "Coordinate").show()

if __name__ == "__main__":
    EXAMPLE = SparkHiveExample()
    EXAMPLE.run()

從 Requests 取得的 JSON 格式資料,使用 createDataFrame 就會變成 Spark的 DataFrame 格式,也因為這個資料集的 Key 只有一個層級,轉換不用特別處理,你能透過 printSchema 看見所有的欄位名稱。 使用 createOrReplaceTempView 會產生一個暫時的 View,就能透過 SQL 語法存取資料了,但注意資料還沒有送進 Hive 喔,只是方便你處理而已,必須用 DataFrame 的函式 saveAsTable 才算完成。剩下的 SQL 操作,看看就懂了吧!附上執行結果:

spark hive result - [python] 使用 Spark 與 Hive 進行 ETL


Youtube 上的教學影片

如果使用的語言是 Scala,請參考這部影片的教學 (Spark 1.6)


參考資料

Jerry
Jerry

樂於分享的軟體工程師,曾在新創與大型科技公司實習,獲得黑客松競賽冠軍,擔任資安研討會講者。長期熱衷於資訊安全、雲端服務、網路行銷等領域,希望將科技知識分享給更多人。內容轉載請來信:jlee58tw@gmail.com

2 則留言

發表回應