六合彩AI預測賽後檢討!邊個模型預測最叻? 每次攪珠當晚10:30更新免費睇!

AWS快速建構高效ETL資料管道教學!

✈️ Trip.com「內地快閃」機票+酒店半價

【每週二 10 AM】 立即領取 半價優惠代碼
最高減 HK$500,CP 值極高,先到先得!


立即搶優惠 🔗

由配置到協調:用AWS建立ETL數據流程,唔再係苦差!

AWS雲端與數據工程

AWS作為雲端市場的領頭羊,憑住早入市、技術成熟同服務多元,穩佔32%市場份額。不過,唔少用家都覺得AWS難用,甚至因為操作複雜而轉投Microsoft Azure或者Google Cloud Platform。

雖然AWS學習曲線高、介面唔夠直覺,但佢嘅可靠性、混合雲支援同服務選擇多,依然係業界首選。最重要嘅係,只要策略得宜,其實可以大大減低配置難度、加快工作流程、提升效能。

今次我會以自己經驗,逐步示範點樣用AWS建立一套完整、可自動化嘅ETL(抽取、轉換、載入)數據流程,等你唔再因為AWS而頭痕,就算第一次用AWS做數據工程都可以輕鬆上手。

設計高效數據流程嘅策略

AWS生態圈夠大,功能齊全。要建好一個可以投產嘅數據倉庫,基本上要用到以下服務:

– IAM:雖然唔直接參與流程,但係所有服務嘅存取權限都靠佢。
– AWS S3:數據湖儲存。
– AWS Glue:ETL數據處理。
– Amazon Redshift:數據倉庫。
– CloudWatch:監控同記錄。

如果你要排更複雜嘅工作流,或者要進階錯誤重試,Airflow都會係好幫手(雖然Redshift都支援簡單排程)。

想工作更順利,我建議裝個IDE(Visual Studio Code、PyCharm或者你自己鍾意嗰個),方便寫Python、做本地測試、版控同團隊協作。下一步我會逐步教你點設定。

初步設置

初步設定步驟如下:

– 用IDE開個虛擬環境。
– 安裝依賴,主要用以下指令:

pip install apache-airflow==2.7.0 boto3 pandas pyspark sqlalchemy

– 裝AWS CLI,方便寫script自動化管理AWS資源。
– AWS CLI設定,輸入IAM用戶嘅Access Key、Secret Key、預設Region(如us-east-1)、預設輸出格式(json)。
– Airflow整合步驟:
– 初始化Airflow
– 喺Airflow建立DAG文件
– 開Web Server(http://localhost:8080,預設admin/admin登入)
– 開另一個Terminal啟動Scheduler

export AIRFLOW_HOME=$(pwd)/airflow
airflow db init
airflow users create
–username admin
–password admin
–firstname Admin
–lastname User
–role Admin
–email admin@example.com

airflow webserver –port 8080

airflow scheduler

開發流程示範:COVID-19數據案例

我用約翰霍普金斯大學(JHU)公開COVID-19數據集做例子(數據連結按此)。

以下係由數據收集到Redshift表格嘅開發環境流程圖:

開發流程圖

數據收集

第一步,將數據拉落AWS S3。我將數據轉做long format,日期格式都轉咗,最後用parquet格式儲存,咁做可以慳位、查詢快啲、成本低啲。主要Python code如下:

import pandas as pd
from datetime import datetime
import os
import boto3
import sys

def process_covid_data():
try:
url = “https://github.com/CSSEGISandData/COVID-19/raw/master/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv”
df = pd.read_csv(url)
df = df.melt(
id_vars=[‘Province/State’, ‘Country/Region’, ‘Lat’, ‘Long’],
var_name=’date_str’,
value_name=’confirmed_cases’
)
df[‘date’] = pd.to_datetime(
df[‘date_str’],
format=’%m/%d/%y’,
errors=’coerce’
).dropna()
output_dir = “covid_processed”
df.to_parquet(
output_dir,
engine=’pyarrow’,
compression=’snappy’,
partition_cols=[‘date’]
)
s3 = boto3.client(‘s3’)
total_files = 0
for root, _, files in os.walk(output_dir):
for file in files:
local_path = os.path.join(root, file)
s3_path = os.path.join(
‘raw/covid/’,
os.path.relpath(local_path, output_dir)
)
s3.upload_file(
Filename=local_path,
Bucket=’my-dev-bucket’,
Key=s3_path
)
total_files += len(files)
print(f”Successfully processed and uploaded {total_files} Parquet files”)
print(f”Data covers from {df[‘date’].min()} to {df[‘date’].max()}”)
return True
except Exception as e:
print(f”Error: {str(e)}”, file=sys.stderr)
return False

if __name__ == “__main__”:
process_covid_data()

執行完之後,你會喺S3 bucket嘅‘raw/covid/’見到parquet檔案。

S3數據檔案

ETL流程開發

AWS Glue主要負責ETL流程。雖然佢都可以做數據收集,但真本事係S3入面做數據倉庫轉換。以下係PySpark轉換腳本:

from awsglue.context import GlueContext
from pyspark.sql.functions import *

glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_options(
“s3”,
{“paths”: [“s3://my-dev-bucket/raw/covid/”]},
format=”parquet”
).toDF()
df_transformed = df.withColumn(“load_date”, current_date())
df_transformed.write.parquet(
“s3://my-dev-bucket/processed/covid/”,
mode=”overwrite”
)

Glue ETL流程

下一步係將數據load入Redshift。喺Redshift Console,揀「Query Editor Q2」,可以編輯SQL同埋做COPY。

CREATE TABLE dev.covid_data (
“Province/State” VARCHAR(100),
“Country/Region” VARCHAR(100),
“Lat” FLOAT8,
“Long” FLOAT8,
date_str VARCHAR(100),
confirmed_cases FLOAT8
)
DISTKEY(“Country/Region”)
SORTKEY(date_str);

COPY dev.covid_data (
“Province/State”,
“Country/Region”,
“Lat”,
“Long”,
date_str,
confirmed_cases
)
FROM ‘s3://my-dev-bucket/processed/covid/’
IAM_ROLE ‘arn:aws:iam::your-account-id:role/RedshiftLoadRole’
REGION ‘your-region’
FORMAT PARQUET;

數據會成功上到Redshift數據倉庫。

Redshift數據表

流程自動化

最簡單自動化方法係喺Redshift query editor v2排定期任務,寫Stored Procedure(詳細可以睇呢篇)。

CREATE OR REPLACE PROCEDURE dev.run_covid_etl()
AS $$
BEGIN
TRUNCATE TABLE dev.covid_data;
COPY dev.covid_data
FROM ‘s3://simba-dev-bucket/raw/covid’
IAM_ROLE ‘arn:aws:iam::your-account-id:role/RedshiftLoadRole’
REGION ‘your-region’
FORMAT PARQUET;
END;
$$ LANGUAGE plpgsql;

Redshift自動化流程

又或者可以用Airflow排定期工作:

from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

default_args = {
‘owner’: ‘data_team’,
‘depends_on_past’: False,
‘start_date’: datetime(2023, 1, 1),
‘retries’: 2
}

with DAG(
‘redshift_etl_dev’,
default_args=default_args,
schedule_interval=’@daily’,
catchup=False
) as dag:

run_etl = RedshiftSQLOperator(
task_id=’run_covid_etl’,
redshift_conn_id=’redshift_dev’,
sql=’CALL dev.run_covid_etl()’,
)

投產環境流程

如果你嘅ETL流程有好多依賴,Airflow DAG可以幫你統籌晒成個流程,亦係業界投產標準做法。

開發同測試完ETL之後,可以用Airflow自動化投產工作:

投產流程圖

主要部署步驟:

– 建立S3 bucket:`my-prod-bucket`
– AWS Console開Glue job:`prod_covid_transformation`
– Redshift建立Stored Procedure:`prod.load_covid_data()`
– 設定Airflow
– `airflow.cfg`設定SMTP發電郵

Airflow部署腳本如下:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.operators.email import EmailOperator

default_args = {
‘owner’: ‘data_team’,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
‘start_date’: datetime(2023, 1, 1)
}

def load_covid_data():
import pandas as pd
import boto3
url = “https://github.com/CSSEGISandData/COVID-19/raw/master/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv”
df = pd.read_csv(url)
df = df.melt(
id_vars=[‘Province/State’, ‘Country/Region’, ‘Lat’, ‘Long’],
var_name=’date_str’,
value_name=’confirmed_cases’
)
df[‘date’] = pd.to_datetime(df[‘date_str’], format=’%m/%d/%y’)
df.to_parquet(
‘s3://my-prod-bucket/raw/covid/’,
engine=’pyarrow’,
partition_cols=[‘date’]
)

with DAG(
‘covid_etl’,
default_args=default_args,
schedule_interval=’@daily’,
catchup=False
) as dag:

ingest = PythonOperator(
task_id=’ingest_data’,
python_callable=load_covid_data
)
transform = GlueJobOperator(
task_id=’transform_data’,
job_name=’prod_covid_transformation’,
script_args={
‘–input_path’: ‘s3://my-prod-bucket/raw/covid/’,
‘–output_path’: ‘s3://my-prod-bucket/processed/covid/’
}
)
load = RedshiftSQLOperator(
task_id=’load_data’,
sql=”CALL prod.load_covid_data()”
)
notify = EmailOperator(
task_id=’send_email’,
to=’you-email-address’,
subject=’ETL Status: {{ ds }}’,
html_content=’ETL job completed: View Logs
)

編輯部評論:AWS數據工程的真正價值與挑戰

AWS一直俾人感覺難入門,原因唔止係服務多、選擇多,仲有每個服務之間嘅權限、設定、連接都要逐步摸索。呢篇文章最大價值,係用實戰例子(COVID-19數據)帶你一步步由本地開發到雲端投產,展示咗「數據工程唔只係寫code,仲要玩得掂自動化、監控、協調同安全」。

對香港或者亞洲市場嘅啟示係:唔好只睇AWS「難」,而係要諗點樣善用佢嘅彈性同可擴展性,尤其係對於需要大規模、可靠、合規數據處理嘅企業。AWS服務之間嘅組合,等你可以由小型POC一路scale up去真正生產級應用,呢點係本地傳統IT架構完全無得比。

不過,AWS最大挑戰其實唔係技術,而係團隊嘅學習成本同協作文化。數據工程唔係單打獨鬥,係要DevOps、數據科學、業務分析一齊玩,所以文章建議用IDE、用Airflow、用版本管理,全部都係現代數據團隊必備技能。

最後,AWS雲端數據工程唔再係「一個人搞掂晒」,而係要有規模、有流程、有自動化、有監控。只要肯投資時間學習,呢套技術同思維會令你做數據工程時更有底氣,無論你將來轉用Azure、GCP,甚至阿里雲,條路都會行得更穩。

總結:AWS雲端數據工程,難係難,但只要肯試、肯學、肯自動化,佢係你數據事業不可或缺嘅底層武器。

🎬 YouTube Premium 家庭 Plan成員一位 只需 HK$148/年

不用提供密碼、不用VPN、無需轉區
直接升級你的香港帳號 ➜ 即享 YouTube + YouTube Music 無廣告播放

立即升級 🔗