Openflow は、任意のソースから任意のターゲットへデータを受け渡しできる機能です。Apache NiFi をベースに構築されており、外部のETLツールを介さずエンドツーエンドでETL処理を実現することができます。
本記事では、Openflow のセットアップからDB連携までの手順を詳しくご紹介します。
※執筆時点でAWSのみ利用可能です。AWS以外のクラウドを選択した場合、メニューに「Openflow」が表示されず利用できないためご注意ください。
環境構築(Openflow)
事前準備
Snowflake のワークシートで以下のSQLを実行します。
USE ROLE ACCOUNTADMIN;
-- イメージリポジトリの作成と権限付与
CREATE DATABASE IF NOT EXISTS OPENFLOW;
USE OPENFLOW;
CREATE SCHEMA IF NOT EXISTS OPENFLOW;
USE SCHEMA OPENFLOW;
CREATE IMAGE REPOSITORY IF NOT EXISTS OPENFLOW;
grant usage on database OPENFLOW to role public;
grant usage on schema OPENFLOW to role public;
grant read on image repository OPENFLOW.OPENFLOW.OPENFLOW to role public;
-- OPENFLOW_ADMIN_ROLEの作成と権限付与
CREATE ROLE IF NOT EXISTS OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW DATA PLANE INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW RUNTIME INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
-- Openflowユーザのセカンダリロール設定
ALTER USER <user_name> SET DEFAULT_SECONDARY_ROLES = ('ALL');
※<user_name>
は自分のユーザ名に置き換えてください。
メニューの「データ」→「Openflow」を開き、「Openflowを起動」をクリックします。
「Invalid consent request」と出て開けなかった場合、ユーザのデフォルトロールがADMINロールになっているため、以下のSQLでデフォルトロールを変更します。
ALTER USER <user_name> SET DEFAULT_ROLE = 'PUBLIC';
Openflow を起動すると、以下のような画面が開きます。
ネットワーク環境構築(Bring your own VPC の場合)
AWSで、デプロイメントを作成するためのネットワーク環境を構築します。
作成するリソースは以下の通りです。
リソース | 説明 |
---|---|
VPC | Openflow 専用のネットワーク空間 |
Public Subnet x 2 | 異なる2つのAZにパブリックサブネットを配置する EKSのLB用に以下のタグを付与する: kubernetes.io/role/elb = 1 |
Private Subnet x 2 | 異なる2つのAZにプライベートサブネットを配置する |
インターネットゲートウェイ | パブリックサブネットのインターネット通信用 |
NATゲートウェイ | プライベートサブネットのインターネット通信用 |
ルートテーブル | サブネット間の通信制御 ・Public Subnet -> インターネットゲートウェイ ・Private Subnet -> NATゲートウェイ |
※Managed VPC の場合も同様の設定になっていることを確認します。
実際の設定例:
デプロイメント作成
「Create a deployment」をクリックし、デプロイメントの設定を行っていきます。
VPC構成はマネージドVPCと既存VPCのいずれかを選ぶことができます。自動作成のVPCを利用するため、「Managed VPC」を選択します。デプロイメントのオーナーロールは OPENFLOWADMINROLE を選択しておきます。
設定が完了したら、「Create deployment」をクリックします。
デプロイメントが作成されると、以下の画面が表示されます。「Download template」でテンプレートをダウンロードします。(以下の画面はDeployments一覧の「Installation details」からも確認可能)
Snowsightで以下のSQLコマンドを実行します。ネットワークポリシーを使用して Snowflake へのアクセスを制御している場合、コメントアウト部分を外して実行してください。(NATGATEWAYPUBLIC_IP は設定したNATゲートウェイのIPアドレスに置き換えてください)
/*
-- ネットワークポリシーの設定
USE ROLE ACCOUNTADMIN;
USE DATABASE {REPLACE_WITH_YOUR_DB_NAME};
CREATE NETWORK RULE allow_openflow_deployment
MODE = INGRESS
TYPE = IPV4
VALUE_LIST = ('{$NAT_GATEWAY_PUBLIC_IP}/32');
-- Run this command to find your currently active network policy and copy the value column
SHOW PARAMETERS LIKE 'NETWORK_POLICY' IN ACCOUNT;
-- Now add the new network rule to this policy
ALTER NETWORK POLICY {ENTER_YOUR_ACTIVE_NETWORK_POLICY_NAME} ADD ALLOWED_NETWORK_RULE_LIST = (allow_openflow_deployment);
*/
-- イベントテーブルの作成
USE ROLE accountadmin;
GRANT create event table on schema OPENFLOW.OPENFLOW to role $ROLE_OF_DEPLOYMENT_OWNER;
USE ROLE $ROLE_OF_DEPLOYMENT_OWNER;
CREATE event table if not exists openflow.openflow.openflow_events;
-- Find the Data Plane Integrations
SHOW openflow data plane integrations;
ALTER openflow data plane integration
$OPENFLOW_deployment_UUID
SET event_table = 'openflow.openflow.openflow_events';
ダウンロードしたテンプレートファイル(YAML)をもとに CloudFormation でスタックの作成を行います。
設定はすべてデフォルトのままで次へ進み、作成を行います。
スタックが「CREATE_COMPLETE」状態になった後、OpenflowAgentInstance のEC2が起動し、必要な残りのリソースの作成を行います。Snowflake Image Repository からのイメージダウンロードやEKSクラスターのデプロイがEC2によって自動的に実行されます。
リソースの作成が完了し、Snowflake側との接続が確立するとデプロイメントの状態が「Active」になります。(処理が問題なく進んだ場合でも、リソース作成からActive状態になるまでは45分ほどかかります)
ランタイム作成
「Create a runtime」をクリックし、ランタイムの作成を行います。
ランタイム名を入力します。その他は適切な値を設定し、「Create」をクリックします。
ランタイムの作成は約2~3分で完了します。作成が完了すると、ランタイムの状態が「Active」になります。
メニューから「View canvas」をクリックし、アクセスを許可すると Openflow canvas が開きます。
以上で Openflow の環境構築は完了です。
環境構築(データベース)
レプリケーション元となるデータベースの作成と設定を行います。
データベース作成
AWS RDS で PostgreSQL データベースを作成します。Snowflake からアクセスできるよう、パブリックアクセスを有効にしておきます。
データベースが作成されたら、「パラメータグループ」で以下の変更を保存します。(今回の場合、カスタムパラメータグループを使用しています)
rds.logical_replication
=1
「変更」→「DB パラメータグループ」で変更したパラメータグループを指定して保存し、RDSを再起動します。
レプリケーション設定
psqlまたはクライアントアプリケーションでレプリケーションの設定を行っていきます。
今回の検証では、A5:SQL Mk-2 を使用します。RDSへの接続は以下のように設定します。
接続できたら、以下のSQLコマンドを順に実行します。
-- レプリケーションが有効になっていることを確認
SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
name | setting |
---|---|
rds.logical_replication | on |
wal_level | logical |
-- コネクタ用のユーザー作成と権限付与
CREATE USER openflow_user WITH PASSWORD '<任意のパスワード>';
GRANT rds_replication TO openflow_user;
-- レプリケーション用テーブル作成
CREATE TABLE sample_data (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO sample_data (content)
VALUES
('aaa'),
('bbb');
id | content | created_at |
---|---|---|
1 | aaa | 2025/06/26 17:46:32.468 |
2 | bbb | 2025/06/26 17:46:32.468 |
-- publication の作成、テーブル追加
CREATE PUBLICATION openflow_pub;
ALTER PUBLICATION openflow_pub ADD TABLE sample_data;
-- ユーザへの権限付与
GRANT USAGE ON SCHEMA public TO openflow_user;
GRANT SELECT ON TABLE sample_data TO openflow_user;
コネクタ設定
事前準備
Snowflake のワークシートで以下のSQLを実行します。
RSAキーペアの設定についてはこちらの記事の「1.認証設定」部分をご参照ください。
-- レプリケーション用データベース作成
CREATE DATABASE openflow_replicated_data;
-- レプリケーションを行うOpenflowユーザの作成と権限付与
CREATE USER openflow_service_user TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
CREATE ROLE openflow_service_role;
GRANT ROLE openflow_service_role TO USER openflow_service_user;
GRANT USAGE ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;
GRANT CREATE SCHEMA ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;
-- Openflowウェアハウスの作成と権限付与
CREATE WAREHOUSE openflow_compute_wh
WITH
WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE;
GRANT USAGE, OPERATE ON WAREHOUSE openflow_compute_wh TO ROLE openflow_service_role;
-- RSA公開鍵の設定
ALTER USER openflow_service_user SET RSA_PUBLIC_KEY = '<RSA公開鍵>';
コネクタ定義設定
こちらを参考にコネクタ定義の設定を行います。
以上でDB連携までの手順は完了です!お疲れ様でした。
トラブルシューティング
デプロイメントの状態が「Inactive」のまま進まない
openflow-agent-{data-plane-key} という名前のEC2インスタンスに接続し、エラーがないかログを確認します。
ログ確認用コマンド:
journalctl -xe -f -n 100 -u docker
journalctl -u openflow-apply-infrastructure -f -n 500
journalctl -u openflow-sync-images -f -n 500
tail -500f /var/log/cloud-init-output.log
今回の場合、以下のようなエラーが繰り返し発生していました。
[ec2-user@ip-10-10-6-22 ~]$ journalctl -xe -f -n 100 -u docker
Jun 23 06:12:24 ip-10-10-6-22.ap-northeast-1.compute.internal dockerd[27020]: time="2025-06-23T06:12:24.488934848Z" level=error msg="Upload failed: name unknown: The repository with name 'snowflake-openflow/runtime-operator' does not exist in the registry with id 'xxxxxxxxxxxx'"
原因はテンプレートファイルの環境変数の指定でした。
exists_check=$(aws ecr describe-repositories --region $AWS_REGION --repository-names $1 2>&1)
if [ $? -ne 0 ]; then
if echo ${!exists_check} | grep -q RepositoryNotFoundException; then
aws ecr create-repository --region $AWS_REGION --repository-name $1 --tags $agent_tags
else
>&2 echo ${!exists_check}
fi
fi
${!exists_check}
は関節参照で、変数exists_check
を展開した結果が格納されます。ECRリポジトリが存在しなかった場合、exists_check
には以下のようなメッセージが格納されます。
An error occurred (RepositoryNotFoundException) when calling the DescribeRepositories operation: The repository with name 'snowflake-openflow/openflow-data-plane-agent-aws' does not exist in the registry with id 'xxxxxxxxxxxx'
そのため${!exists_check}
は${An error occurred ...}
となりますが、そのような変数はないためエラーとなり、以下のif文部分のコマンド実行結果が0(正常終了)になることはありません。
echo ${!exists_check} | grep -q RepositoryNotFoundException;
よって、対象のECRリポジトリが存在しない場合、本来実行されるべき以下のコマンドが実行されず、エラーのままとなります。
aws ecr create-repository --region $AWS_REGION --repository-name $1 --tags $agent_tags
以上の内容を踏まえて、環境変数の指定誤りを修正し、再デプロイします。
テンプレートファイルを開き、以下の部分(OpenflowAgentInstance以降)をコピー&ペーストで置き換えればOKです。
▶ クリックして展開
# EC2 Instance with user data to download, extract, and run the software infrastructure installation
OpenflowAgentInstance:
Type: "AWS::EC2::Instance"
Properties:
InstanceType: "t3.medium"
IamInstanceProfile:
Ref: "EC2InstanceProfile"
ImageId: !Ref LatestAmiId
BlockDeviceMappings:
- DeviceName: /dev/xvda
Ebs:
VolumeSize: 20
VolumeType: gp3
SubnetId: !Ref "PrivateSubnet1"
Tags:
- Key: "Name"
Value: !Sub "openflow-agent-${DataPlaneKey}"
UserData:
Fn::Base64: !Sub
- |
#!/bin/bash
cat <<\EOF > /home/ec2-user/.env
#!/bin/bash
UNIQUE_SUFFIX=${DataPlaneKey}
DATA_PLANE_UUID=${DataPlaneUUID}
AWS_REGION=${AWSRegion}
DOMAIN=${DataPlaneURL}
IMAGE_REGISTRY=${AWSAccountId}.dkr.ecr.${AWSRegion}.amazonaws.com
AGENT_IMAGE_REPOSITORY=openflow-data-plane-agent-aws
AGENT_IMAGE_VERSION=${AgentImageVersion}
S3_BUCKET_NAME=${TerraformStateBucket}
OPERATOR_IMAGE_REPOSITORY=runtime-operator
OPERATOR_CHART_REPOSITORY=runtime-operator-chart
OPERATOR_CHART_VERSION=${RuntimeOperatorChartVersion}
SNOWFLAKE_LOGIN_ENABLE=true
GATEWAY_IMAGE_REPOSITORY=openflow-runtime-gateway
GATEWAY_IMAGE_VERSION=${RuntimeGatewayVersion}
SERVER_IMAGE_REPOSITORY=runtime-server
EXTENSIONS_IMAGE_REPOSITORY=runtime-extensions
SF_ORG=${SnowflakeOrganization}
SF_ACCOUNT=${SnowflakeAccount}
SF_DB=${SnowflakeDatabase}
SF_SCHEMA=${SnowflakeSchema}
SF_IMG_REPO=${SnowflakeImageRepository}
SNOWFLAKE_ACCOUNT_URL=${SnowflakeAccountURL}
SNOWFLAKE_OAUTH_INTEGRATION_NAME=${SnowflakeOAuthIntegrationName}
SNOWFLAKE_OAUTH_TOKEN_URL=${SnowflakeAccountURL}/oauth/token-request
ENABLE_TELEMETRY=true
TELEMETRY_URL=${TelemetryURL}
CONTROL_PLANE_URL=${ControlPlaneURL}
VPC_ID=${VpcId}
PRIVATE_SUBNET_1_ID=${PrivateSubnet1Id}
PRIVATE_SUBNET_2_ID=${PrivateSubnet2Id}
DPS_ENABLE=true
DPS_IMAGE_REPOSITORY=openflow-dataplane-service
DPS_CHART_REPOSITORY=openflow-dataplane-service-chart
DPS_CHART_VERSION=${DataPlaneServiceChartVersion}
DPUI_IMAGE_REPOSITORY=openflow-dataplane-ui
DPUI_CHART_REPOSITORY=openflow-dataplane-ui-chart
DPUI_CHART_VERSION=${DataPlaneUIChartVersion}
DPS_FEATURE_CONNECTOR_INSTALLATION=true
USE_IMAGE_REGISTRY_OVER_PRIVATE_LINK=${UseImageRegistryOverPrivateLink}
EOF
chown ec2-user:ec2-user /home/ec2-user/.env
chmod 644 /home/ec2-user/.env
source /home/ec2-user/.env
dnf update -y
dnf install -y docker
yum remove aws-cli -y
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
./aws/install
aws --version
rm -rf awscliv2.zip aws/
cat <<\EOF > /home/ec2-user/ensure-ecr-repo.sh
#!/bin/bash
source /home/ec2-user/.env
instance_id=$(ec2-metadata -i | cut -d ' ' -f2)
exists_check=$(aws ecr describe-repositories --region $AWS_REGION --repository-names $1 2>&1)
if [ $? -ne 0 ]; then
if echo $exists_check | grep -q RepositoryNotFoundException; then
aws ecr create-repository --region $AWS_REGION --repository-name $1
else
>&2 echo $exists_check
fi
fi
EOF
SNOWFLAKE_CLI_ARCH=x86_64
SNOWFLAKE_CLI_VERSION=3.8.3
curl "https://sfc-repo.snowflakecomputing.com/snowflake-cli/linux_$SNOWFLAKE_CLI_ARCH/$SNOWFLAKE_CLI_VERSION/snowflake-cli-$SNOWFLAKE_CLI_VERSION.$SNOWFLAKE_CLI_ARCH.rpm" -o "snowflake-cli.rpm"
rpm -i snowflake-cli.rpm
systemctl start docker
systemctl enable docker
usermod -a -G docker ec2-user
wget https://get.helm.sh/helm-v3.16.4-linux-amd64.tar.gz
tar -zxvf helm-v3.16.4-linux-amd64.tar.gz
mv ./linux-amd64/helm /usr/local/bin/helm
rm -rf helm-v3.16.4-linux-amd64.tar.gz
rm -rf linux-amd64
cat <<\EOF > /home/ec2-user/.upgrade
#!/bin/bash
AGENT_IMAGE_VERSION_UPGRADE=
OPERATOR_CHART_VERSION_UPGRADE=
GATEWAY_IMAGE_VERSION_UPGRADE=
DPS_CHART_VERSION_UPGRADE=
DPUI_CHART_VERSION_UPGRADE=
EOF
cat <<\EOF > /home/ec2-user/sync-images.sh
#!/bin/bash
source /home/ec2-user/.env
echo "Logging in to local private ECR..."
aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $IMAGE_REGISTRY
OAUTH_SECRET_ID="snowflake-oauth2-$UNIQUE_SUFFIX"
oauth2_creds=$(aws secretsmanager get-secret-value --region $AWS_REGION --query SecretString --output text --secret-id $OAUTH_SECRET_ID)
CLIENT_ID=$(echo $oauth2_creds | jq -r '.clientID')
CLIENT_SECRET=$(echo $oauth2_creds | jq -r '.clientSecret')
access_token=$(curl -s --request POST --url $SNOWFLAKE_OAUTH_TOKEN_URL \
--header 'content-type: application/x-www-form-urlencoded' \
--data-urlencode grant_type=client_credentials \
--data-urlencode client_id=$CLIENT_ID \
--data-urlencode client_secret=$CLIENT_SECRET | jq -r '.access_token')
TOKEN_PATH="/home/ec2-user/access.token"
echo -n $access_token > $TOKEN_PATH
if [[ "$USE_IMAGE_REGISTRY_OVER_PRIVATE_LINK" == true ]];
then
echo "Connecting to image registry using a private link"
private_link_param='--private-link';
else
echo "Connecting to image registry using a public endpoint"
private_link_param='';
fi
SPCS_PARAMS="""-x \
--account=$SF_ORG-$SF_ACCOUNT \
--database=$SF_DB \
--schema=$SF_SCHEMA \
--host=$(echo $SNOWFLAKE_ACCOUNT_URL | sed 's/^https:\/\///') \
--authenticator=OAUTH \
--token-file-path=$TOKEN_PATH \
--format=JSON
"""
SF_REG=$(snow spcs image-registry url $private_link_param $SPCS_PARAMS | jq -r '.message')
echo "Logging in to Snowflake Image Registry https://$SF_REG ..."
echo $access_token | docker login "https://$SF_REG" \
--username 0auth2accesstoken \
--password-stdin
echo $access_token | helm registry login "https://$SF_REG" \
--username 0auth2accesstoken \
--password-stdin
echo "Discovering tags for repo $SF_IMG_REPO"
latest_images=$(snow spcs image-repository list-images $SF_IMG_REPO $SPCS_PARAMS)
rm $TOKEN_PATH
echo "Found images\n$latest_images"
docker_keep_only() {
local image_name="$1"
local image_tag="$2"
docker images | grep "$image_name" | grep -v "$image_tag" | awk '{ print $3}' | sort | uniq | xargs docker rmi -f 2>/dev/null
}
sync_image() {
local image_name="$1"
local image_tag="$2"
SNOW_IMAGE_PATH="$SF_DB/$SF_SCHEMA/$SF_IMG_REPO/$image_name:$image_tag"
if [[ "$image_name" == *"-chart" ]]; then
registryId=$(echo $IMAGE_REGISTRY | cut -d'.' -f1)
foundImageLocalEcr="$(aws ecr describe-images --region $AWS_REGION --registry-id $registryId --repository-name "snowflake-openflow/$image_name" --image-ids=imageTag=$image_tag 2>&1)"
if [ $? -ne 0 ]; then
echo "No $image_name:$image_tag found locally, so syncing"
helm pull oci://$SF_REG/$SF_DB/$SF_SCHEMA/$SF_IMG_REPO/$image_name --version $image_tag
helm push $image_name-$image_tag.tgz oci://$IMAGE_REGISTRY/snowflake-openflow
app_version=$(helm show values $image_name-$image_tag.tgz --jsonpath "{$.image.tag}")
if [[ $OPERATOR_CHART_REPOSITORY == $image_name ]]; then
echo "Pulling app $OPERATOR_IMAGE_REPOSITORY $app_version for helm chart $image_name $image_tag"
sync_image $OPERATOR_IMAGE_REPOSITORY $app_version
elif [[ $DPS_CHART_REPOSITORY == $image_name ]]; then
echo "Pulling app $DPS_IMAGE_REPOSITORY $app_version for helm chart $image_name $image_tag"
sync_image $DPS_IMAGE_REPOSITORY $app_version
elif [[ $DPUI_CHART_REPOSITORY == $image_name ]]; then
echo "Pulling app $DPUI_IMAGE_REPOSITORY $app_version for helm chart $image_name $image_tag"
sync_image $DPUI_IMAGE_REPOSITORY $app_version
fi
rm ./$image_name-$image_tag.tgz
else
echo "Found $image_name:$image_tag in local ECR, so no need to re-sync"
fi
else
LOCAL_IMAGE_PATH=$IMAGE_REGISTRY/snowflake-openflow/$image_name:$image_tag
docker pull $SF_REG/$SNOW_IMAGE_PATH
docker tag $SF_REG/$SNOW_IMAGE_PATH $LOCAL_IMAGE_PATH
docker push $LOCAL_IMAGE_PATH
fi
}
# If specific versions are set, make sure that we sync those
if [ -n "$AGENT_IMAGE_VERSION" ]; then
sync_image $AGENT_IMAGE_REPOSITORY $AGENT_IMAGE_VERSION
fi
if [ -n "$GATEWAY_IMAGE_VERSION" ]; then
sync_image $GATEWAY_IMAGE_REPOSITORY $GATEWAY_IMAGE_VERSION
fi
if [ -n "$OPERATOR_CHART_VERSION" ]; then
sync_image $OPERATOR_CHART_REPOSITORY $OPERATOR_CHART_VERSION
fi
if [ -n "$DPS_CHART_VERSION" ]; then
sync_image $DPS_CHART_REPOSITORY $DPS_CHART_VERSION
fi
if [ -n "$DPUI_CHART_VERSION" ]; then
sync_image $DPUI_CHART_REPOSITORY $DPUI_CHART_VERSION
fi
image_repos="$AGENT_IMAGE_REPOSITORY $EXTENSIONS_IMAGE_REPOSITORY $OPERATOR_IMAGE_REPOSITORY $OPERATOR_CHART_REPOSITORY $SERVER_IMAGE_REPOSITORY $DPS_IMAGE_REPOSITORY $DPS_CHART_REPOSITORY $DPUI_IMAGE_REPOSITORY $DPUI_CHART_REPOSITORY $GATEWAY_IMAGE_REPOSITORY"
IFS=' '
for image_name in $image_repos
do
# Find only the latest tag for this image
latest_tag=$(echo "$latest_images" | jq -r ".[] | select(.image_name == \"$image_name\") | .tags" | sed -r 's/-/~/' | sort -Vr | sed -r 's/~/-/' | head -1)
sync_image $image_name $latest_tag
if [[ $AGENT_IMAGE_REPOSITORY == $image_name ]]; then
if [ -z "$AGENT_IMAGE_VERSION" ]; then
echo "Setting Agent image version to $latest_tag"
sed -i'' -Ee "s|(AGENT_IMAGE_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
docker_keep_only $image_name $latest_tag
else
if [[ $AGENT_IMAGE_VERSION != $latest_tag ]]; then
sed -i'' -Ee "s|(AGENT_IMAGE_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
fi
docker_keep_only $image_name $AGENT_IMAGE_VERSION
fi
elif [[ $GATEWAY_IMAGE_REPOSITORY == $image_name ]]; then
if [ -z "$GATEWAY_IMAGE_VERSION" ]; then
echo "Setting Gateway image version to $latest_tag"
sed -i'' -Ee "s|(GATEWAY_IMAGE_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
docker_keep_only $image_name $latest_tag
else
if [[ $GATEWAY_IMAGE_VERSION != $latest_tag ]]; then
sed -i'' -Ee "s|(GATEWAY_IMAGE_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
fi
docker_keep_only $image_name $GATEWAY_IMAGE_VERSION
fi
elif [[ $OPERATOR_CHART_REPOSITORY == $image_name ]]; then
if [ -z "$OPERATOR_CHART_VERSION" ]; then
echo "Setting Operator chart version to $latest_tag"
sed -i'' -Ee "s|(OPERATOR_CHART_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
elif [[ $OPERATOR_CHART_VERSION != $latest_tag ]]; then
sed -i'' -Ee "s|(OPERATOR_CHART_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
fi
elif [[ $DPS_CHART_REPOSITORY == $image_name ]]; then
if [ -z "$DPS_CHART_VERSION" ]; then
echo "Setting DPS chart version to $latest_tag"
sed -i'' -Ee "s|(DPS_CHART_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
elif [[ $DPS_CHART_VERSION != $latest_tag ]]; then
sed -i'' -Ee "s|(DPS_CHART_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
fi
elif [[ $DPUI_CHART_REPOSITORY == $image_name ]]; then
if [ -z "$DPUI_CHART_VERSION" ]; then
echo "Setting DP UI chart version to $latest_tag"
sed -i'' -Ee "s|(DPUI_CHART_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
elif [[ $DPUI_CHART_VERSION != $latest_tag ]]; then
sed -i'' -Ee "s|(DPUI_CHART_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
fi
else
# Delete any other versions of image from local Docker cache
docker_keep_only $image_name $latest_tag
fi
done
unset IFS
echo "Done syncing repositories!"
EOF
cat <<\EOF > /etc/systemd/system/openflow-sync-images.service
[Unit]
Description="Sync images for Snowflake Openflow"
[Service]
User=ec2-user
Group=ec2-user
TimeoutSec=600
WorkingDirectory=/home/ec2-user
ExecStart=/home/ec2-user/sync-images.sh
[Install]
WantedBy=multi-user.target
EOF
cat <<\EOF > /etc/systemd/system/openflow-sync-images.timer
[Unit]
Description="Run openflow-sync-images.service every 10 minutes relative to deactivation time"
[Timer]
OnBootSec=5min
OnUnitInactiveSec=10min
Unit=openflow-sync-images.service
[Install]
WantedBy=timers.target
EOF
cat <<\EOF > /home/ec2-user/create.sh
#!/bin/bash
source /home/ec2-user/.env
sudo systemctl enable openflow-apply-infrastructure.timer
sudo systemctl start openflow-apply-infrastructure.timer
sudo systemctl enable openflow-apply-infrastructure.service
sudo systemctl start openflow-apply-infrastructure.service
sudo journalctl -u openflow-apply-infrastructure.service -f
EOF
cat <<\EOF > /home/ec2-user/apply-infrastructure.sh
#!/bin/bash
source /home/ec2-user/.env
# Run in the foreground
docker run --name apply_infra --log-driver=journald --env-file /home/ec2-user/.env --rm "$IMAGE_REGISTRY/snowflake-openflow/$AGENT_IMAGE_REPOSITORY:$AGENT_IMAGE_VERSION" /opt/openflow/create.sh
EOF
cat <<\EOF > /etc/systemd/system/openflow-apply-infrastructure.service
[Unit]
Description="Apply baseline Snowflake Openflow infrastructure configuration"
[Service]
User=ec2-user
Group=ec2-user
TimeoutSec=0
WorkingDirectory=/home/ec2-user
ExecStart=/home/ec2-user/apply-infrastructure.sh
[Install]
WantedBy=multi-user.target
EOF
cat <<\EOF > /etc/systemd/system/openflow-apply-infrastructure.timer
[Unit]
Description="Run openflow-apply-infrastructure.service every 10 minutes relative to deactivation time"
[Timer]
OnBootSec=5min
OnUnitInactiveSec=10min
Unit=openflow-apply-infrastructure.service
[Install]
WantedBy=timers.target
EOF
cat <<\EOF > /home/ec2-user/destroy.sh
#!/bin/bash
source /home/ec2-user/.env
read -rp $'\nAre you sure you want to delete the Data Plane and all Runtimes present? [Y/n]: ' proceed
if [ -z "$proceed" ]; then
proceed="Y"
fi
[[ ! $proceed =~ ^[Yy]$ ]] && { echo -e "\nOperation canceled, aborting teardown procedure!"; echo; exit 1; }
# Stop auto-apply of infrastructure baseline
sudo systemctl disable openflow-apply-infrastructure.timer
sudo systemctl stop openflow-apply-infrastructure.timer
sudo systemctl disable openflow-apply-infrastructure.service
sudo systemctl stop openflow-apply-infrastructure.service
container_id=$(docker run --log-driver=journald --env-file /home/ec2-user/.env -d --rm "$IMAGE_REGISTRY/snowflake-openflow/$AGENT_IMAGE_REPOSITORY:$AGENT_IMAGE_VERSION" /bin/bash -c "/opt/openflow/cli/update-kubeconfig.sh; /opt/openflow/destroy.sh")
docker logs -f $container_id
EOF
cat <<\EOF > /home/ec2-user/diagnostics.sh
#!/bin/bash
source /home/ec2-user/.env
docker run -it --env-file /home/ec2-user/.env --rm -w "/opt/openflow/cli" "$IMAGE_REGISTRY/snowflake-openflow/$AGENT_IMAGE_REPOSITORY:$AGENT_IMAGE_VERSION" /bin/bash -c "/opt/openflow/cli/update-kubeconfig.sh; ./diagnostics.sh"
EOF
cat <<\EOF > /home/ec2-user/upgrade-data-plane.sh
#!/bin/bash
set -o errexit
source ~/.env
source ~/.upgrade
images="[
{\"repo\":\"$AGENT_IMAGE_REPOSITORY\",\"version_env\":\"AGENT_IMAGE_VERSION\"},
{\"repo\":\"$DPS_CHART_REPOSITORY\",\"version_env\":\"DPS_CHART_VERSION\"},
{\"repo\":\"$DPUI_CHART_REPOSITORY\",\"version_env\":\"DPUI_CHART_VERSION\"},
{\"repo\":\"$GATEWAY_IMAGE_REPOSITORY\",\"version_env\":\"GATEWAY_IMAGE_VERSION\"},
{\"repo\":\"$OPERATOR_CHART_REPOSITORY\",\"version_env\":\"OPERATOR_CHART_VERSION\"}
]"
echo "$images" | jq -c -r '.[]' | while read item; do
repo=$(jq -r '.repo' <<< $item)
version_env=$(jq -r '.version_env' <<< $item)
version="${!!version_env}"
latestVersionVar="$version_env"_UPGRADE
latestVersion="${!!latestVersionVar}"
echo "$repo is set to version $version"
if [[ -z $latestVersion || $version == $latestVersion ]]; then
echo -e "\tNo upgrade is available"
else
sed -i'' -Ee "s|($version_env=).*|\1$latestVersion|g" /home/ec2-user/.env
echo -e "\tUpgrade set to version $latestVersion"
fi
sed -i'' -Ee "s|(""$version_env""_UPGRADE=).*|\1|g" /home/ec2-user/.upgrade
done
echo -e "\n*****\nUpgrades are applied during the next cycle:"
sudo systemctl list-timers openflow-apply-infrastructure | head -n 2
EOF
chmod +x /home/ec2-user/*.sh
chown ec2-user:ec2-user /home/ec2-user/*.sh
image_repos="$AGENT_IMAGE_REPOSITORY $EXTENSIONS_IMAGE_REPOSITORY $OPERATOR_IMAGE_REPOSITORY $OPERATOR_CHART_REPOSITORY $SERVER_IMAGE_REPOSITORY $DPS_IMAGE_REPOSITORY $DPS_CHART_REPOSITORY $DPUI_IMAGE_REPOSITORY $DPUI_CHART_REPOSITORY $GATEWAY_IMAGE_REPOSITORY"
IFS=' '
for i in $image_repos
do
/home/ec2-user/ensure-ecr-repo.sh snowflake-openflow/$i
done
unset IFS
/home/ec2-user/sync-images.sh
systemctl daemon-reload
systemctl enable openflow-sync-images.timer
systemctl start openflow-sync-images.timer
systemctl enable openflow-sync-images.service
systemctl start openflow-sync-images.service
systemctl enable openflow-apply-infrastructure.timer
systemctl start openflow-apply-infrastructure.timer
systemctl enable openflow-apply-infrastructure.service
systemctl start openflow-apply-infrastructure.service
- AWSAccountId: !Ref AWS::AccountId
AWSRegion: !Ref AWS::Region
TerraformStateBucket: !Ref TerraformStateBucket
DataPlaneKey: !Ref DataPlaneKey
DataPlaneUUID: !Ref DataPlaneUUID
DataPlaneURL: !Ref DataPlaneURL
AgentImageVersion: !Ref AgentImageVersion
DataPlaneServiceChartVersion: !Ref DataPlaneServiceChartVersion
DataPlaneUIChartVersion: !Ref DataPlaneUIChartVersion
RuntimeOperatorChartVersion: !Ref RuntimeOperatorChartVersion
RuntimeGatewayVersion: !Ref RuntimeGatewayVersion
SnowflakeOrganization: !Ref SnowflakeOrganization
SnowflakeAccount: !Ref SnowflakeAccount
SnowflakeDatabase: !Ref SnowflakeDatabase
SnowflakeSchema: !Ref SnowflakeSchema
SnowflakeImageRepository: !Ref SnowflakeImageRepository
SnowflakeAccountURL: !Ref SnowflakeAccountURL
SnowflakeOAuthIntegrationName: !Ref SnowflakeOAuthIntegrationName
TelemetryURL: !Ref TelemetryURL
ControlPlaneURL: !Ref ControlPlaneURL
VpcId: !Ref InfraVPC
PrivateSubnet1Id: !Ref PrivateSubnet1
PrivateSubnet2Id: !Ref PrivateSubnet2
UseImageRegistryOverPrivateLink: !Ref UseImageRegistryOverPrivateLink
※本来のものから間接参照その他を修正しています。
課金について
便利な Openflow ですが、課金には注意が必要です。
Openflow の課金対象要素は以下の通りです。
種類 | 説明 |
---|---|
サービス | Openflow のアクティブなコネクタランタイムに対して発生する費用 |
基盤 | AWS の場合、以下で費用が発生する。
|
取り込み | データの取り込みに対して発生する費用 |
テレメトリー | イベントのログやメトリクスに対して発生する費用 |
特に注意が必要なのは基盤部分です。
画像:Understanding Openflow costs | Snowflake Documentationより引用
公式ドキュメントにもあるように、ランタイムを作成した時点で以下の3種類のインスタンスが起動します。
- エージェントインスタンス(コンテナイメージの最新化、インフラ構築)
- 管理コンテナ(Openflowアプリケーション全体の管理)
- ランタイムコンテナ(レプリケーション処理実行)
各インスタンスはデフォルトで常時稼働状態であるため、使用しない時間帯は停止するなどして、課金の発生を抑える必要があります。
例:PostgreSQL (RDS) の100GBのデータを1か月間継続ロードした場合
- サービス:0.0225 * 1 * 24 * 30 * $4.30 = $69.66
- 基盤
- VPC:NATゲートウェイの合計データ処理量が200GBと仮定して、$0.062 * 24 * 30 + 200 * $0.062 = $57.04
- EC2:
- t3.medium:$0.0136 * 24 * 30 = $9.792
- m7i.xlarge:$0.2604 * 24 * 30 = $187.488
- m7i.2xlarge:$0.5208 * 24 * 30 = $374.976
- EKS:クラスター1つに対し、$0.10 * 24 * 30 = $72
- S3:ログの最大出力量を1GBとして、$0.025 * 1 = $0.025
- 取り込み:Snowpipe Streaming で合計データ処理量200GBと仮定して、0.0037 * 200 * $4.30 = $3.182
- テレメトリー:ログの最大出力量を1GBとして、0.02 * 1 * $4.30 = $0.086
上記を合計すると、$774.249 ≒ 108,395円(1ドル140円)
クレジットの詳細:CreditConsumptionTable.pdf
計算例でもわかる通り、EC2部分の料金が大半を占めています。最小の構成でもそこそこ大きなインスタンスが立ってしまうので、注意する必要があります。
さいごに
Openflow の使い方について、詳しくご紹介しました。
Openflow は多くのコネクタが提供されており、様々なデータソースを一元的に集約して Snowflake に取り込むことができる便利なサービスですが、デフォルトである程度パワフルな構成になっているため、想像以上に課金が発生する点については注意が必要です。
特定のデータソースの連携が目的であれば、こちらの記事でもご紹介しているように Snowpipe Streaming といったサービスを使う方法もあります。ご利用のユースケースに応じて、最適な方法を選択するようにしてください。