気軽に楽しくプログラムと遊ぶ

自分が興味があってためになるかもって思う情報を提供しています。

MWAAとは?

概要

Amazon Managed Workflows for Apache Airflow の略称。(AirflowのAmazonのManagedサービス)

利用メリット

  • マネージドサービスで管理が楽
  • Dag、Pligin、RequirementsはS3バケットで管理
  • バッチの可視化が可能で、簡単にON/OFFや起動が可能

Apache AirFlow

  • ワークフロー管理ツール
  • ワークフローをコードで書き、実行スケジューリング、モニタリングを行うプラットフォーム
  • DAG(Directed Acyclic Graph)
    • 依存関係のある処理同士を繋いだものをDAG(有向非巡回グラフ)と呼び、ワークフローを作成する
    • DAG(有向非巡回グラフ)は、一つの方向に情報が流れるグラフ
    • 実装言語はPython

Amazon MWAA UI

作成した Airflow UI に飛ぶと以下のような画面が参照できる

  • 一番左のトグル:DAGのON/OFF
  • Rans:緑丸が成功回数、赤丸が失敗回数
  • Schedule:起動タイミング(cron形式 or 日付形式で入力)
  • Action:手動実行

その他、Linksで、DAGのコードや、以下のDAG実行カレンダーなどが見れる

MWAAを一覧に反映させる

  • MWAA用のS3にPythonファイルを配することで反映
  • GitHubでソース管理し、GitHub ActionsなどでS3に反映などすると良い

DAGの実装

基本的なタスク

  • Operators: Airflowのタスクテンプレ
    • BashOperator、AWSサービスのOperatorなどなど
  • Sensors: 他のタスクのイベントに応じて使用するタスク
  • @task: Pythonでタスクを記述

実装例は以下。

with DAG(
        dag_id="batch-check",
        default_view="graph",
        start_date=datetime(2023, 6, 9, hour=1, tzinfo=LOCAL_TZ),
        schedule_interval='0 1 * * *',
        max_active_runs=1,
        catchup=False,
        tags=TAGS,
        on_failure_callback=send_slack(),
) as dag:
    run_task = run_ecs_task("CheckJob")
    task_finish = task_finish(run_task.output["ecs_task_arn"])
    chain(
        run_task,
        task_finish
    )

公式サイト