Snowflake のメトリクスをモニタリングサービスで管理したいと思ったことはないでしょうか?
Snowflake には外部関数(External Functions)という仕組みがあり、Snowflake のメトリクスや監査ログを外部サービスに送信することができます。本記事では、Snowflake のメトリクスを CloudWatch に連携する方法についてご紹介します。
さっそく、以降で詳しい手順について見ていきましょう。
Lambda関数作成
Snowflake からデータを受け取り、CloudWatch にログを送信するためのLambda関数を作成します。
AWSでLambdaのコンソールを開き、関数を新規作成します。関数作成後のソースコードの編集画面で、以下のファイルを作成します。
lambda_function.py
import json
import datetime, decimal
import boto3
def default_json_transform(obj):
if isinstance(obj, decimal.Decimal):
return str(obj)
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
raise TypeError
def lambda_handler(event, context):
array_of_rows_to_return = []
status_code = 200
event_body = event["body"]
payload = json.loads(event_body)
for row in payload["data"]:
row_number = row[0]
metricName = row[1]
value = row[2]
unit = row[3]
nameSpace = row[4]
dimensionsArray = row[5]
dimensionDict = json.loads(dimensionsArray)
metricResponse = send_cloudwatch_metric(metricName, value, unit, nameSpace, dimensionDict)
row_to_return = [row_number, metricResponse]
array_of_rows_to_return.append(row_to_return)
json_compatible_string_to_return = json.dumps({"data" : array_of_rows_to_return}, default=default_json_transform)
return {
'statusCode': status_code,
'body': json_compatible_string_to_return
}
def send_cloudwatch_metric(metricName, value, unit, nameSpace, dimensionDict):
cloudwatch = boto3.client('cloudwatch')
response = cloudwatch.put_metric_data(
MetricData=[
{
'MetricName': metricName,
'Dimensions': dimensionDict,
'Unit': unit,
'Value': value
},
],
Namespace=nameSpace
)
return response
「Deploy」をクリックし、デプロイします。
また、自動作成されたLambda関数のロールに以下のログメトリクス送信用ポリシーを追加します。
snowflake-cloudwatch-log-policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}
API Gateway作成
Snowflake はリモートサービス(Lambda)に直接データを送信するのではなく、データを中継するプロキシサービス(API Gateway)を介してデータを送信します。そのため、先ほど作成したLambda関数にデータを送信するための API Gateway を作成します。
IAMロール作成
Snowflake が認証に使用するIAMロールを作成します。
IAMを開き、「ロールを作成」をクリックします。信頼されたエンティティタイプは「AWSアカウント」、AWSアカウントは「このアカウント」を選択して次に進みます。
「許可を追加」は何も追加しなくて構いません。ロール名や説明は適当なものを設定し、「ロールを作成」をクリックします。
API Gatewayエンドポイント作成
エンドポイントにはリージョナルエンドポイントとプライベートエンドポイントの2種類あります。今回は特定リージョン内にアクセスを制限する要件はないため、リージョンやプラットフォームをまたいで利用可能なリージョナルエンドポイントを作成します。
API Gateway のコンソールを開き、APIの作成を行います。「REST API」の「構築」をクリックします。
「新しいAPI」で、エンドポイントタイプで「リージョン」を選択して作成します。
作成後の画面で、「メソッドを作成」をクリックします。
メソッドの詳細を以下の通り設定します。
- メソッドタイプ:POST
- 統合タイプ:Lambda関数
- Lambdaプロキシ統合:有効
- Lambda関数:上記で設定したLambda関数
※Lambdaプロキシ統合を有効化していないとJSONの形式が変わってしまうため、必ず有効化しておく必要があります。
設定が完了したら「メソッドを作成」をクリックします。
メソッドが作成されたら、「APIをデプロイ」をクリックします。
ステージは「新しいステージ」を選択し、適当なステージ名を設定して「デプロイ」をクリックします。
API Gatewayエンドポイント認証設定
メニューから「リソース」を開き、「メソッドリクエスト」の「編集」をクリックします。
認可を「AWS IAM」に変更して「保存」をクリックします。
メニューから「リソースポリシー」を開き、「ポリシーを作成」をクリックします。
編集画面に以下を貼り付けます。
{
"Version": "2012-10-17",
"Statement":
[
{
"Effect": "Allow",
"Principal":
{
"AWS": "arn:aws:sts::<12-digit-number>:assumed-role/<external_function_role>/snowflake"
},
"Action": "execute-api:Invoke",
"Resource": "<method_request_ARN>"
}
]
}
変数部分は以下の通り置き換えてください。
<12-digit-number>
:AWSのアカウントID<external_function_role>
:上記で作成したIAMロール名<method_request_ARN>
:APIのPOSTメソッドのARN
今回の設定例:
編集が完了したら「変更を保存」をクリックします。
更新したAPIを再度デプロイします。
API統合作成
Snowflake のワークシートで以下のSQLを実行します。
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE API INTEGRATION int_cloudwatch_api
API_PROVIDER = AWS_API_GATEWAY
API_AWS_ROLE_ARN = '<IAMロールARN>'
API_ALLOWED_PREFIXES = ('<APIの呼び出しURL>')
ENABLED = TRUE;
IAMロールARNは「API Gateway作成」で作成したロールです。
APIの呼び出しURLは「ステージ」の「URLを呼び出す」に記載のURLです。
IAMロール連携
以下のSQLを実行します。
DESCRIBE INTEGRATION int_cloudwatch_api;
実行結果の以下の値を控えておきます。
- APIAWSIAMUSERARN
- APIAWSEXTERNAL_ID
AWSでIAMコンソールを開き、「IAMロール作成」で作成したロールの信頼関係を以下のように編集し保存します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "<API_AWS_IAM_USER_ARN>"
},
"Action": "sts:AssumeRole",
"Condition": {"StringEquals": { "sts:ExternalId": "<API_AWS_EXTERNAL_ID>" }}
}
]
}
外部関数作成
以下のSQLを実行し、API統合をもとに外部関数send_cloudwatch_metric
を作成します。
CREATE DATABASE IF NOT EXISTS EXTERNAL_APIS;
USE EXTERNAL_APIS;
CREATE SCHEMA IF NOT EXISTS EXTERNAL_FUNCTIONS;
USE SCHEMA EXTERNAL_FUNCTIONS;
CREATE OR REPLACE EXTERNAL FUNCTION send_cloudwatch_metric(metricname STRING, value NUMBER, unit STRING, namespace STRING, dimension_json VARCHAR)
RETURNS VARIANT
API_INTEGRATION = int_cloudwatch_api
AS '<APIの呼び出しURL>';
関数実行
外部関数は通常の関数と同じようにSELECT文で実行することができます。
SELECT send_cloudwatch_metric(
'QueryCount',
100,
'None',
'Snowflake',
'[{\"Name\": \"ACCOUNT\",\"Value\": \"MY_ACCOUNT\"}]'
);
実行に成功するとメトリクスが送信され、CloudWatch の「メトリクス」に記録されます。
さいごに
Snowflake のメトリクスをモニタリングサービス(CloudWatch)に連携する方法についてご紹介しました。
外部関数を利用すると、AWS以外のクラウドサービス(Google Cloud、Azure)でも同じようにメトリクスの連携や外部サービスを利用した処理が柔軟にできるようになります。ユースケースに応じて効果的に活用してみてください。