前回の記事に引き続き、Snowflake のログを CloudWatch に連携する方法についてご紹介します。
外部関数作成までの基本的な流れは同じです。今回はアクセス履歴を10分ごとに取得し自動で CloudWatch に連携するため、プロシージャの作成とタスクの登録も行います。
Lambda関数
前回の記事で作成したLambda関数のPythonコードを以下に置き換えます。
import json
import datetime, decimal
import boto3
def default_json_transform(obj):
if isinstance(obj, decimal.Decimal):
return str(obj)
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
raise TypeError
def lambda_handler(event, context):
array_of_rows_to_return = []
status_code = 200
event_body = event["body"]
payload = json.loads(event_body)
for row in payload["data"]:
row_number = row[0]
groupName = row[1]
streamName = row[2]
dimensionsArray = row[3]
dimensionDict = json.loads(dimensionsArray)
metricResponse = send_cloudwatch_logs(groupName, streamName, dimensionDict)
row_to_return = [row_number, metricResponse]
array_of_rows_to_return.append(row_to_return)
json_compatible_string_to_return = json.dumps({"data" : array_of_rows_to_return}, default=default_json_transform)
return {
'statusCode': status_code,
'body': json_compatible_string_to_return
}
def send_cloudwatch_logs(groupName, streamName, dimensionDict):
log_client = boto3.client('logs')
response = log_client.put_log_events(
logGroupName=groupName,
logStreamName=streamName,
logEvents=[{
'timestamp': int(datetime.datetime.now().timestamp() * 1000),
'message': json.dumps(dimensionDict)
}]
)
return response
今回はCloudWatchロググループへのログ出力を行うので、Lambdaの実行ロールに以下のポリシーがアタッチされていることを確認します。
snowflake-cloudwatch-log-policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
また、書き込み先のロググループとログストリームを以下のように作成しておきます。
- ロググループ:snowflake-cloudwatch-logs
- ログストリーム:access-history
外部関数
以下のSQLを実行し、ログ送信用の外部関数send_cloudwatch_logs
を作成します。
CREATE DATABASE IF NOT EXISTS EXTERNAL_APIS;
USE EXTERNAL_APIS;
CREATE SCHEMA IF NOT EXISTS EXTERNAL_FUNCTIONS;
USE SCHEMA EXTERNAL_FUNCTIONS;
CREATE OR REPLACE EXTERNAL FUNCTION send_cloudwatch_logs(groupname STRING, streamname STRING, dimension_json VARCHAR)
RETURNS VARIANT
API_INTEGRATION = int_cloudwatch_api
AS '<APIの呼び出しURL>';
プロシージャ
以下のSQLを実行し、プロシージャを登録します。
直近10分以内のアクセス履歴を取得するプロシージャです。
CREATE OR REPLACE PROCEDURE cloudwatch_logs_query()
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS $$
result = '';
try {
// アクセス履歴取得(直近10分以内)
access_history_query =
`SELECT *
FROM SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY
WHERE QUERY_START_TIME >= DATEADD(minute, -10, CURRENT_TIMESTAMP)
ORDER BY QUERY_START_TIME;`;
access_history = snowflake.execute({sqlText: access_history_query});
while (access_history.next()) {
// クエリ結果の取得
result_json = {
query_id: access_history.getColumnValue(1),
query_start_time: access_history.getColumnValue(2),
user_name: access_history.getColumnValue(3),
direct_objects_accessed: access_history.getColumnValue(4),
base_objects_accessed: access_history.getColumnValue(5),
objects_modified: access_history.getColumnValue(6),
objects_modified_by_ddl: access_history.getColumnValue(7),
policies_referenced: access_history.getColumnValue(8),
parent_query_id: access_history.getColumnValue(9),
root_query_id: access_history.getColumnValue(10)
}
// アクセス履歴の送信
send_logs_query = `SELECT send_cloudwatch_logs(
\'snowflake-cloudwatch-logs\',
\'access-history\',
\'${JSON.stringify(result_json)}\'
)`;
snowflake.execute({sqlText: send_logs_query})
}
result = 'Success';
}
catch (err) {
result = "Failed: Code: " + err.code + "\nState: " + err.state;
result += "\nMessage: " + err.message;
result += "\nStack Trace:\n" + err.stackTraceTxt;
}
return result;
$$;
通常のプロシージャ実行は以下のコマンドで実行可能です。
CALL cloudwatch_logs_query();
タスクの作成と登録
タスクを登録することで、プロシージャを定期実行させることができます。
以下のSQLを実行し、タスクの作成と有効化を行います。
-- タスク作成
CREATE OR REPLACE TASK cloudwatch_logs_task
WAREHOUSE = compute_wh -- 適切なウェアハウスを指定
SCHEDULE = 'USING CRON */10 * * * * UTC' -- 10分ごとに実行 (UTCタイムゾーン)
AS
CALL cloudwatch_logs_query();
-- タスク有効化
ALTER TASK cloudwatch_logs_task RESUME;
-- タスク一時停止
-- ALTER TASK oracle_replication_task SUSPEND;
タスクの詳細は以下のコマンドで確認可能です。
SHOW TASKS LIKE 'cloudwatch_logs_task';
動作確認
タスクが実行されると、以下のように対象のログストリームにログが出力されます。
さいごに
Snowflake の監査ログをモニタリングサービス(CloudWatch)に連携する方法についてご紹介しました。
モニタリングサービスに連携することで、必要なログを一元管理できることに加え、ログの分析もしやすくなります。
また、外部関数を利用してログを連携できるだけでなく、プロシージャとタスクを組み合わせることで処理の定期実行が可能となり、運用を効率化することができます。Snowflake のログ連携でお悩みの方は、本記事の内容を参考に外部関数の利用を検討してみてください。