GCP版Dataformで冪等性を担保する設計ポイント3つ

データエンジニアの遠藤です。

TVer Advent Calendar 2023の24日目の記事になります。

はじめに

本年(2023年)、Google Cloudのビッグデータ基盤として展開されるBigQueryでは、データガバナンスツールであるDataformがGA(Generally Avaialble)になりました。

cloud.google.com

このDataformの登場により、BigQuery上でデータを利活用しやすいように変換する(データマートを生成する)システムの構築が容易になりました。

本記事では、Dataform上において、定常実行やリトライ実行を容易にするために、冪等性が担保される設計のテクニックを3点紹介します。(Dataformの基本的な使い方については触れませんのでご注意ください)

1. SQLX内のクエリに変数を用いる

DataformはSQLXと呼ばれるファイルでデータ変換処理内容を管理します。 SQLXファイルにはデータ更新設定とクエリを記載しますが、クエリ中で変数を利用することができます。

まず、 includes/dataform.jsonvars 内でSQLXファイル内で用いる変数を定義します。

{
  "warehouse": "bigquery",
  "defaultDatabase": "dev-project",
  "defaultLocation": "us-central1",
  "vars": {
    "certain_date": "2023-01-01"
  }
}

dataform.jsonで定義した変数は以下のようにSQLXのクエリ内に表記します(以下の例ではWHERE句の中でdataform.jsonで定義した変数 certain_date を用いるように設定しています)。

config {
    type: "table",
    schema: "result_dataset",
    name: "result_table",
}

SELECT column_a,
       column_b,
       column_c,
       partition_column
FROM ${ref("source_table")}
WHERE partition_column = '${ dataform.projectConfig.vars.certain_date }'

これにより、パラメータによって可変になるクエリ表現が可能になります。

2. 定期的な実行はDataform APIから行う

一般的に、Google Cloud上のDataformは以下の方法で実行することができます。

  • ワークスペース内のGUIで「実行を開始」をクリックする
  • リリース構成・ワークフロー構成を設定する
  • Dataform APIにジョブ実行リクエストを送る

処理を冪等にするためには、SQLX内の変数のようなパラメータを毎回変えながら定期的に実行する仕組みが必要です。

この要件を十分に満たすには、上記で3番目に挙げた実行方法「Dataform APIからの実行」が最適です。

なぜなら、「Dataform APIからの実行」は、APIでのリクエスト情報を適切に設定することで、dataform.json内の設定をオーバーライドして実行することが可能であるからです。

cloud.google.com

「Dataform APIからの実行」は、以下の2つをDataform APIで処理することで実現します。

まず、コンパイル結果を作成するため、以下のDataform API「compilationResults.create」を実行します。

cloud.google.com

API「compilationResults.create」は、「CodeCompilationConfig」オブジェクトに以下の情報を設定することで、「CodeCompilationConfig」内の設定情報を優先しながらコンパイル結果を作成します。

  • 出力結果格納先BigQueryプロジェクト(defaultDatabase)
  • 出力結果格納先データセット名の接尾辞(schemaSuffix)
  • 出力結果格納先テーブル名の接頭辞(tablePrefix)
  • 変数(vars)

このAPIリクエストでは、コンパイル結果が正常に作成されると、compilationResult というコンパイル結果IDが返されます。

次に、このコンパイル結果IDを用いて、以下のDataform API「workflowInvocations.create」を実行すると、作成したコンパイル結果でジョブを実行します。

cloud.google.com

このように、「Dataform APIからの実行」は、パラメータ制御の自由度が高くなるため、backfill実行も容易に行うことが可能です。

なお、Google Cloudでは、Dataform APIによる一連の実行はCloud Composer・Cloud Workflowsといった他のジョブ管理ツールから行うように推奨されています。

3. クエリ結果を積み上げる場合はpre_operations処理を追加する

Dataformにおける出力結果の格納方法は以下の方法が可能です(SQLXのconfigで設定します)。

  • VIEW化(結果の出力はせずにVIEWで設定するのみにとどめる)
  • 洗い替え(格納先のテーブルにデータが存在する場合、上書きして更新する)
  • 積み上げ(格納先のテーブル上の既存データはそのままにしながら、クエリ結果を新たに追加する)

結果格納の設定が「積み上げ」の場合、同じジョブが複数回実行されると、出力結果が重複して格納されてしまいます。

そのため、ジョブを複数回実行しても出力結果が冪等になるためには、SQLX内に新規にpre_operations項目を設けてそこにDELETE文を設定することで解決します。

config {
    type: "incremental",
    schema: "result_dataset",
    name: "result_incremental_table",
}

pre_operations {
    DELETE FROM ${self()} WHERE partition_column = '${ dataform.projectConfig.vars.certain_date }'
}

SELECT column_a,
       column_b,
       column_c,
       partition_column
FROM ${ref("source_table")}
WHERE partition_column = '${ dataform.projectConfig.vars.certain_date }'

pre_operations内のDELETE文は、1回目の実行では何も影響がないですが、2回目以降の実行では重複を避ける処理として効果を発揮します。

おわりに

本記事では、GCP版Dataform上に載せるシステムにおいてジョブが冪等になるためのポイント3点を紹介させていただきました。

  1. SQLX内のクエリに変数を用いる
  2. 定期的な実行はDataform APIから行う
  3. クエリ結果を積み上げる場合はpre_operations処理を追加する

以上の3点を考慮すれば、Dataform運用の効率性が高まるかと思いますので、ぜひ参考にしてみてください。