レック・テクノロジー・コンサルティング株式会社TECH BLOG

Snowpipe x Azure Event Grid で自動データロードを試してみる

Snowpipe は Snowflake で提供されているデータロード機能です。ステージ(データ取り込み対象のストレージ)でファイルが利用可能になったタイミングでデータロードを行うことができます。

COPY コマンドを手動で実行する一括ロードと違い、クラウドストレージのイベント通知や REST API による自動ロードを実現できるという点が特徴です。

本記事では Snowpipe と Azure Event Grid を利用してデータロードを自動化する方法についてご紹介します。

※操作はすべてACCOUNTADMINロールで行っています。

設定手順

設定手順は以下の通りです。

1. テーブル準備

Snowflake のワークシートで以下のSQLを実行し、テーブルemployeesの作成とロード用のCSVファイル形式csv_formatの作成を行います。

-- employeesテーブル作成
CREATE OR REPLACE TABLE employees (
  employee_id STRING,
  first_name STRING,
  last_name STRING,
  email STRING,
  department STRING,
  salary INTEGER
);

-- ファイル形式作成
CREATE OR REPLACE FILE FORMAT csv_format
  TYPE = 'CSV'
  FIELD_DELIMITER = ','
  SKIP_HEADER = 0;

2. ストレージへのアクセス構成

こちらの記事を参考に、以下の設定を行います。

  • ストレージアカウント作成
  • ストレージ統合(azure_int)作成
  • 外部ステージ(ext_stage_azure)作成

3. Event Grid サブスクリプション構成

Azure ポータルでストレージアカウントコンソールを開き、ファイルロード対象のストレージアカウントの「イベント」で「+イベントサブスクリプション」をクリックします。

image.png

基本設定で以下の項目を設定します。

  • 名前:イベントサブスクリプションの名前
  • システムトピック名:イベント送信先となるシステムトピックの名前
  • イベントの種類のフィルター:送信対象のイベント。今回はファイルのアップロード時にイベントを送信したいため、「Blob Created」を選択。
  • エンドポイントの種類:「ストレージキュー」を選択
  • エンドポイント:「エンドポイントの構成」をクリックして設定

image-1.png

エンドポイントの構成は以下のように設定します。

image-2.png

その他はデフォルト設定のままで「作成」をクリックします。

4. 通知統合作成

Snowflake で以下のSQLを実行します。

CREATE NOTIFICATION INTEGRATION azure_notification_int
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
  AZURE_STORAGE_QUEUE_PRIMARY_URI = '<queue_URL>'
  AZURE_TENANT_ID = '<tenant_ID>';

<queue_URL>は対象ストレージアカウントの「データストレージ」→「キュー」を開き、上記で設定したキューのURLの値を設定します。

image-3.png

<tenant_id>は Azure ポータルから「Microsoft Entra ID」を開き、概要の「テナントID」で確認できます。

image-4.png

5. ストレージキューへのアクセス許可

Snowflake で以下のSQLを実行します。

DESC NOTIFICATION INTEGRATION azure_notification_int;

image-5.png

実行結果のAZURE_CONSENT_URLのリンクをクリックします。

アクセス許可画面にリダイレクトされるので、「承諾」をクリックします。
これにより、Snowflake がストレージキューにアクセスするためのアプリケーションが作成されます。

次に、アプリケーションにストレージキューに対するロールを付与します。

対象ストレージキューの「アクセス制御(IAM)」を開き、「追加」をクリックします。

image-6.png

ロールの割り当ての追加で、「ストレージキューデータ共同作成者」を選択し「次へ」をクリックします。

image-7.png

「メンバーを選択する」をクリックし、AZURE_MULTI_TENANT_APP_NAMEに記載のアプリケーションを選択します。

image-8.png

「レビューと割り当て」をクリックして完了です。

6. パイプ作成

Snowflake で以下のSQLを実行します。

CREATE PIPE test_pipe
  AUTO_INGEST = true
  INTEGRATION = 'AZURE_NOTIFICATION_INT'
  AS
    COPY INTO employees
      FROM @ext_stage_azure
      FILE_FORMAT = csv_format;

test_pipeという名前のパイプを作成するコマンドです。AS句以降に指定した COPY コマンドが、AZURE_NOTIFICATION_INTから通知を受け取ったタイミングで実行されます。

作成したパイプの詳細はデータベース管理画面から確認することができます。

image-9.png

以上でパイプの設定は完了です。

動作確認

以下のemployeesテーブルに追加データをロードしてみます。

image-10.png

Azure ポータルから対象ストレージアカウントのコンテナにCSVファイルemployees_2.csvをアップロードします。

image-11.png

CSVファイルの内容は以下の通りです。

1006,David,Wilson,david.wilson@example.com,Engineering,72000
1007,Susan,Martinez,susan.martinez@example.com,Marketing,68000
1008,Robert,Garcia,robert.garcia@example.com,Sales,71000
1009,Linda,Anderson,linda.anderson@example.com,HR,59000
1010,James,Thomas,james.thomas@example.com,Finance,82000

アップロード後、パイプが作動してテーブルにデータが取り込まれたことを確認できました。

image-12.png

さいごに

Snowpipe を利用すると、特定の場所にアップロードされたファイルのデータを自動的にテーブルに取り込むことができます。

パイプによりデータロードが自動化できることに加え、イベント通知やAPIリクエストがあったタイミングのみパイプが実行されるため運用コストも最小限に抑えることができます。

今回ご紹介したのは Azure の場合のみですが、AWS や Google Cloud でも同様にデータロードを自動化することが可能です。お使いの環境や要件に応じて、利用を検討してみてください。

参考

この記事をシェアする

  • Facebook
  • X
  • Pocket
  • Line
  • Hatena
  • Linkedin

資料請求・お問い合わせはこちら

ページトップへ戻る