レック・テクノロジー・コンサルティング株式会社TECH BLOG

Snowflakeの監査ログをCloudWatchに連携する方法

前回の記事に引き続き、Snowflake のログを CloudWatch に連携する方法についてご紹介します。

外部関数作成までの基本的な流れは同じです。今回はアクセス履歴を10分ごとに取得し自動で CloudWatch に連携するため、プロシージャの作成とタスクの登録も行います。

Lambda関数

前回の記事で作成したLambda関数のPythonコードを以下に置き換えます。

import json
import datetime, decimal

import boto3

def default_json_transform(obj):
    if isinstance(obj, decimal.Decimal):
        return str(obj)
    if isinstance(obj, (datetime.date, datetime.datetime)):
        return obj.isoformat()

    raise TypeError

def lambda_handler(event, context):
    array_of_rows_to_return = []
    status_code = 200

    event_body = event["body"]
    payload = json.loads(event_body)

    for row in payload["data"]:
        row_number = row[0]
        groupName = row[1]
        streamName = row[2]
        dimensionsArray = row[3]

        dimensionDict = json.loads(dimensionsArray)
        metricResponse = send_cloudwatch_logs(groupName, streamName, dimensionDict)

        row_to_return = [row_number, metricResponse]
        array_of_rows_to_return.append(row_to_return)

    json_compatible_string_to_return = json.dumps({"data" : array_of_rows_to_return}, default=default_json_transform)

    return {
        'statusCode': status_code,
        'body': json_compatible_string_to_return
    }

def send_cloudwatch_logs(groupName, streamName, dimensionDict):
    log_client = boto3.client('logs')

    response = log_client.put_log_events(
        logGroupName=groupName,
        logStreamName=streamName,
        logEvents=[{
            'timestamp': int(datetime.datetime.now().timestamp() * 1000),
            'message': json.dumps(dimensionDict)
        }]
    )

    return response

今回はCloudWatchロググループへのログ出力を行うので、Lambdaの実行ロールに以下のポリシーがアタッチされていることを確認します。

snowflake-cloudwatch-log-policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

また、書き込み先のロググループとログストリームを以下のように作成しておきます。

  • ロググループ:snowflake-cloudwatch-logs
  • ログストリーム:access-history

外部関数

以下のSQLを実行し、ログ送信用の外部関数send_cloudwatch_logsを作成します。

CREATE DATABASE IF NOT EXISTS EXTERNAL_APIS;
USE EXTERNAL_APIS;
CREATE SCHEMA IF NOT EXISTS EXTERNAL_FUNCTIONS;
USE SCHEMA EXTERNAL_FUNCTIONS;

CREATE OR REPLACE EXTERNAL FUNCTION send_cloudwatch_logs(groupname STRING, streamname STRING, dimension_json VARCHAR)
  RETURNS VARIANT
  API_INTEGRATION = int_cloudwatch_api
  AS '<APIの呼び出しURL>';

プロシージャ

以下のSQLを実行し、プロシージャを登録します。
直近10分以内のアクセス履歴を取得するプロシージャです。

CREATE OR REPLACE PROCEDURE cloudwatch_logs_query()
    RETURNS VARCHAR
    LANGUAGE JAVASCRIPT
    EXECUTE AS CALLER
    AS $$
    result = '';

    try {
        // アクセス履歴取得(直近10分以内)
        access_history_query = 
        `SELECT *
        FROM SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY
        WHERE QUERY_START_TIME >= DATEADD(minute, -10, CURRENT_TIMESTAMP)
        ORDER BY QUERY_START_TIME;`;

        access_history = snowflake.execute({sqlText: access_history_query});

        while (access_history.next()) {
            // クエリ結果の取得
            result_json = {
                query_id: access_history.getColumnValue(1),
                query_start_time: access_history.getColumnValue(2),
                user_name: access_history.getColumnValue(3),
                direct_objects_accessed: access_history.getColumnValue(4),
                base_objects_accessed: access_history.getColumnValue(5),
                objects_modified: access_history.getColumnValue(6),
                objects_modified_by_ddl: access_history.getColumnValue(7),
                policies_referenced: access_history.getColumnValue(8),
                parent_query_id: access_history.getColumnValue(9),
                root_query_id: access_history.getColumnValue(10)
            }

            // アクセス履歴の送信
            send_logs_query = `SELECT send_cloudwatch_logs(
                \'snowflake-cloudwatch-logs\',
                \'access-history\',
                \'${JSON.stringify(result_json)}\'
            )`;

            snowflake.execute({sqlText: send_logs_query})
        }

        result = 'Success';
    }
    catch (err) {
        result =  "Failed: Code: " + err.code + "\nState: " + err.state;
        result += "\nMessage: " + err.message;
        result += "\nStack Trace:\n" + err.stackTraceTxt;
    }

    return result;
$$;

通常のプロシージャ実行は以下のコマンドで実行可能です。

CALL cloudwatch_logs_query();

タスクの作成と登録

タスクを登録することで、プロシージャを定期実行させることができます。

以下のSQLを実行し、タスクの作成と有効化を行います。

-- タスク作成
CREATE OR REPLACE TASK cloudwatch_logs_task
  WAREHOUSE = compute_wh  -- 適切なウェアハウスを指定
  SCHEDULE = 'USING CRON */10 * * * * UTC'  -- 10分ごとに実行 (UTCタイムゾーン)
AS
  CALL cloudwatch_logs_query();

-- タスク有効化
ALTER TASK cloudwatch_logs_task RESUME;

-- タスク一時停止
-- ALTER TASK oracle_replication_task SUSPEND;

タスクの詳細は以下のコマンドで確認可能です。

SHOW TASKS LIKE 'cloudwatch_logs_task';

動作確認

タスクが実行されると、以下のように対象のログストリームにログが出力されます。

image.png

さいごに

Snowflake の監査ログをモニタリングサービス(CloudWatch)に連携する方法についてご紹介しました。

モニタリングサービスに連携することで、必要なログを一元管理できることに加え、ログの分析もしやすくなります。

また、外部関数を利用してログを連携できるだけでなく、プロシージャとタスクを組み合わせることで処理の定期実行が可能となり、運用を効率化することができます。Snowflake のログ連携でお悩みの方は、本記事の内容を参考に外部関数の利用を検討してみてください。

参考

この記事をシェアする

  • Facebook
  • X
  • Pocket
  • Line
  • Hatena
  • Linkedin

資料請求・お問い合わせはこちら

ページトップへ戻る