レック・テクノロジー・コンサルティング株式会社TECH BLOG

Snowflake Openflowのセットアップ~DB連携まで

Openflow は、任意のソースから任意のターゲットへデータを受け渡しできる機能です。Apache NiFi をベースに構築されており、外部のETLツールを介さずエンドツーエンドでETL処理を実現することができます。

本記事では、Openflow のセットアップからDB連携までの手順を詳しくご紹介します。

※執筆時点でAWSのみ利用可能です。AWS以外のクラウドを選択した場合、メニューに「Openflow」が表示されず利用できないためご注意ください。

環境構築(Openflow)

事前準備

Snowflake のワークシートで以下のSQLを実行します。

USE ROLE ACCOUNTADMIN;

-- イメージリポジトリの作成と権限付与
CREATE DATABASE IF NOT EXISTS OPENFLOW;
USE OPENFLOW;
CREATE SCHEMA IF NOT EXISTS OPENFLOW;
USE SCHEMA OPENFLOW;
CREATE IMAGE REPOSITORY IF NOT EXISTS OPENFLOW;
grant usage on database OPENFLOW to role public;
grant usage on schema OPENFLOW to role public;
grant read on image repository OPENFLOW.OPENFLOW.OPENFLOW to role public;

-- OPENFLOW_ADMIN_ROLEの作成と権限付与
CREATE ROLE IF NOT EXISTS OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW DATA PLANE INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW RUNTIME INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;

-- Openflowユーザのセカンダリロール設定
ALTER USER <user_name> SET DEFAULT_SECONDARY_ROLES = ('ALL');

<user_name>は自分のユーザ名に置き換えてください。

メニューの「データ」→「Openflow」を開き、「Openflowを起動」をクリックします。

image.png

「Invalid consent request」と出て開けなかった場合、ユーザのデフォルトロールがADMINロールになっているため、以下のSQLでデフォルトロールを変更します。

ALTER USER <user_name> SET DEFAULT_ROLE = 'PUBLIC';

Openflow を起動すると、以下のような画面が開きます。

image-1.png

ネットワーク環境構築(Bring your own VPC の場合)

AWSで、デプロイメントを作成するためのネットワーク環境を構築します。

作成するリソースは以下の通りです。

リソース 説明
VPC Openflow 専用のネットワーク空間
Public Subnet x 2 異なる2つのAZにパブリックサブネットを配置する
EKSのLB用に以下のタグを付与する:
kubernetes.io/role/elb = 1
Private Subnet x 2 異なる2つのAZにプライベートサブネットを配置する
インターネットゲートウェイ パブリックサブネットのインターネット通信用
NATゲートウェイ プライベートサブネットのインターネット通信用
ルートテーブル サブネット間の通信制御
・Public Subnet -> インターネットゲートウェイ
・Private Subnet -> NATゲートウェイ

※Managed VPC の場合も同様の設定になっていることを確認します。

実際の設定例:

image-7.png

デプロイメント作成

「Create a deployment」をクリックし、デプロイメントの設定を行っていきます。

image-2.png

VPC構成はマネージドVPCと既存VPCのいずれかを選ぶことができます。自動作成のVPCを利用するため、「Managed VPC」を選択します。デプロイメントのオーナーロールは OPENFLOWADMINROLE を選択しておきます。

image-3.png

設定が完了したら、「Create deployment」をクリックします。

デプロイメントが作成されると、以下の画面が表示されます。「Download template」でテンプレートをダウンロードします。(以下の画面はDeployments一覧の「Installation details」からも確認可能)

image-4.png

Snowsightで以下のSQLコマンドを実行します。ネットワークポリシーを使用して Snowflake へのアクセスを制御している場合、コメントアウト部分を外して実行してください。(NATGATEWAYPUBLIC_IP は設定したNATゲートウェイのIPアドレスに置き換えてください)

/*
-- ネットワークポリシーの設定
USE ROLE ACCOUNTADMIN;
USE DATABASE {REPLACE_WITH_YOUR_DB_NAME};

CREATE NETWORK RULE allow_openflow_deployment
MODE = INGRESS
TYPE = IPV4
VALUE_LIST = ('{$NAT_GATEWAY_PUBLIC_IP}/32');

-- Run this command to find your currently active network policy and copy the value column
SHOW PARAMETERS LIKE 'NETWORK_POLICY' IN ACCOUNT;
-- Now add the new network rule to this policy
ALTER NETWORK POLICY {ENTER_YOUR_ACTIVE_NETWORK_POLICY_NAME} ADD ALLOWED_NETWORK_RULE_LIST = (allow_openflow_deployment);
*/

-- イベントテーブルの作成
USE ROLE accountadmin;
GRANT create event table on schema OPENFLOW.OPENFLOW to role $ROLE_OF_DEPLOYMENT_OWNER;

USE ROLE $ROLE_OF_DEPLOYMENT_OWNER;
CREATE event table if not exists openflow.openflow.openflow_events;

-- Find the Data Plane Integrations
SHOW openflow data plane integrations;

ALTER openflow data plane integration
$OPENFLOW_deployment_UUID
SET event_table = 'openflow.openflow.openflow_events';

ダウンロードしたテンプレートファイル(YAML)をもとに CloudFormation でスタックの作成を行います。

image-5.png

設定はすべてデフォルトのままで次へ進み、作成を行います。

スタックが「CREATE_COMPLETE」状態になった後、OpenflowAgentInstance のEC2が起動し、必要な残りのリソースの作成を行います。Snowflake Image Repository からのイメージダウンロードやEKSクラスターのデプロイがEC2によって自動的に実行されます。

リソースの作成が完了し、Snowflake側との接続が確立するとデプロイメントの状態が「Active」になります。(処理が問題なく進んだ場合でも、リソース作成からActive状態になるまでは45分ほどかかります)

image-6.png

ランタイム作成

「Create a runtime」をクリックし、ランタイムの作成を行います。

image-9.png

ランタイム名を入力します。その他は適切な値を設定し、「Create」をクリックします。

image-10.png

ランタイムの作成は約2~3分で完了します。作成が完了すると、ランタイムの状態が「Active」になります。

image-11.png

メニューから「View canvas」をクリックし、アクセスを許可すると Openflow canvas が開きます。

image-12.png

image-13.png

以上で Openflow の環境構築は完了です。

環境構築(データベース)

レプリケーション元となるデータベースの作成と設定を行います。

データベース作成

AWS RDS で PostgreSQL データベースを作成します。Snowflake からアクセスできるよう、パブリックアクセスを有効にしておきます。

データベースが作成されたら、「パラメータグループ」で以下の変更を保存します。(今回の場合、カスタムパラメータグループを使用しています)

rds.logical_replication=1

image-15.png

「変更」→「DB パラメータグループ」で変更したパラメータグループを指定して保存し、RDSを再起動します。

レプリケーション設定

psqlまたはクライアントアプリケーションでレプリケーションの設定を行っていきます。

今回の検証では、A5:SQL Mk-2 を使用します。RDSへの接続は以下のように設定します。

image-16.png

image-17.png

接続できたら、以下のSQLコマンドを順に実行します。

-- レプリケーションが有効になっていることを確認
SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
name setting
rds.logical_replication on
wal_level logical
-- コネクタ用のユーザー作成と権限付与
CREATE USER openflow_user WITH PASSWORD '<任意のパスワード>';
GRANT rds_replication TO openflow_user;
-- レプリケーション用テーブル作成
CREATE TABLE sample_data (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO sample_data (content)
VALUES
    ('aaa'),
    ('bbb');
id content created_at
1 aaa 2025/06/26 17:46:32.468
2 bbb 2025/06/26 17:46:32.468
-- publication の作成、テーブル追加
CREATE PUBLICATION openflow_pub;
ALTER PUBLICATION openflow_pub ADD TABLE sample_data;

-- ユーザへの権限付与
GRANT USAGE ON SCHEMA public TO openflow_user;
GRANT SELECT ON TABLE sample_data TO openflow_user;

コネクタ設定

事前準備

Snowflake のワークシートで以下のSQLを実行します。

RSAキーペアの設定についてはこちらの記事の「1.認証設定」部分をご参照ください。

-- レプリケーション用データベース作成
CREATE DATABASE openflow_replicated_data;

-- レプリケーションを行うOpenflowユーザの作成と権限付与
CREATE USER openflow_service_user TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
CREATE ROLE openflow_service_role;
GRANT ROLE openflow_service_role TO USER openflow_service_user;
GRANT USAGE ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;
GRANT CREATE SCHEMA ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;

-- Openflowウェアハウスの作成と権限付与
CREATE WAREHOUSE openflow_compute_wh
  WITH
    WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE;
GRANT USAGE, OPERATE ON WAREHOUSE openflow_compute_wh TO ROLE openflow_service_role;

-- RSA公開鍵の設定
ALTER USER openflow_service_user SET RSA_PUBLIC_KEY = '<RSA公開鍵>';

コネクタ定義設定

こちらを参考にコネクタ定義の設定を行います。

以上でDB連携までの手順は完了です!お疲れ様でした。

トラブルシューティング

デプロイメントの状態が「Inactive」のまま進まない

openflow-agent-{data-plane-key} という名前のEC2インスタンスに接続し、エラーがないかログを確認します。

image-8.png

ログ確認用コマンド:

journalctl -xe -f -n 100 -u docker
journalctl -u openflow-apply-infrastructure -f -n 500
journalctl -u openflow-sync-images -f -n 500
tail -500f /var/log/cloud-init-output.log

今回の場合、以下のようなエラーが繰り返し発生していました。

[ec2-user@ip-10-10-6-22 ~]$ journalctl -xe -f -n 100 -u docker
Jun 23 06:12:24 ip-10-10-6-22.ap-northeast-1.compute.internal dockerd[27020]: time="2025-06-23T06:12:24.488934848Z" level=error msg="Upload failed: name unknown: The repository with name 'snowflake-openflow/runtime-operator' does not exist in the registry with id 'xxxxxxxxxxxx'"

原因はテンプレートファイルの環境変数の指定でした。

exists_check=$(aws ecr describe-repositories --region $AWS_REGION --repository-names $1 2>&1)
if [ $? -ne 0 ]; then
    if echo ${!exists_check} | grep -q RepositoryNotFoundException; then
    aws ecr create-repository --region $AWS_REGION --repository-name $1 --tags $agent_tags
    else
    >&2 echo ${!exists_check}
    fi
fi

${!exists_check}は関節参照で、変数exists_checkを展開した結果が格納されます。ECRリポジトリが存在しなかった場合、exists_checkには以下のようなメッセージが格納されます。

An error occurred (RepositoryNotFoundException) when calling the DescribeRepositories operation: The repository with name 'snowflake-openflow/openflow-data-plane-agent-aws' does not exist in the registry with id 'xxxxxxxxxxxx'

そのため${!exists_check}${An error occurred ...}となりますが、そのような変数はないためエラーとなり、以下のif文部分のコマンド実行結果が0(正常終了)になることはありません。

echo ${!exists_check} | grep -q RepositoryNotFoundException;

よって、対象のECRリポジトリが存在しない場合、本来実行されるべき以下のコマンドが実行されず、エラーのままとなります。

aws ecr create-repository --region $AWS_REGION --repository-name $1 --tags $agent_tags

以上の内容を踏まえて、環境変数の指定誤りを修正し、再デプロイします。

テンプレートファイルを開き、以下の部分(OpenflowAgentInstance以降)をコピー&ペーストで置き換えればOKです。

▶ クリックして展開

  # EC2 Instance with user data to download, extract, and run the software infrastructure installation
  OpenflowAgentInstance:
    Type: "AWS::EC2::Instance"
    Properties:
      InstanceType: "t3.medium"
      IamInstanceProfile: 
        Ref: "EC2InstanceProfile"
      ImageId: !Ref LatestAmiId
      BlockDeviceMappings:
        - DeviceName: /dev/xvda
          Ebs:
            VolumeSize: 20
            VolumeType: gp3
      SubnetId: !Ref "PrivateSubnet1"
      Tags:
        - Key: "Name"
          Value: !Sub "openflow-agent-${DataPlaneKey}"
      UserData:
        Fn::Base64: !Sub
          - |
            #!/bin/bash
            cat <<\EOF > /home/ec2-user/.env
            #!/bin/bash
            UNIQUE_SUFFIX=${DataPlaneKey}
            DATA_PLANE_UUID=${DataPlaneUUID}
            AWS_REGION=${AWSRegion}
            DOMAIN=${DataPlaneURL}
            IMAGE_REGISTRY=${AWSAccountId}.dkr.ecr.${AWSRegion}.amazonaws.com
            AGENT_IMAGE_REPOSITORY=openflow-data-plane-agent-aws
            AGENT_IMAGE_VERSION=${AgentImageVersion}
            S3_BUCKET_NAME=${TerraformStateBucket}
            OPERATOR_IMAGE_REPOSITORY=runtime-operator
            OPERATOR_CHART_REPOSITORY=runtime-operator-chart
            OPERATOR_CHART_VERSION=${RuntimeOperatorChartVersion}
            SNOWFLAKE_LOGIN_ENABLE=true
            GATEWAY_IMAGE_REPOSITORY=openflow-runtime-gateway
            GATEWAY_IMAGE_VERSION=${RuntimeGatewayVersion}
            SERVER_IMAGE_REPOSITORY=runtime-server
            EXTENSIONS_IMAGE_REPOSITORY=runtime-extensions
            SF_ORG=${SnowflakeOrganization}
            SF_ACCOUNT=${SnowflakeAccount}
            SF_DB=${SnowflakeDatabase}
            SF_SCHEMA=${SnowflakeSchema}
            SF_IMG_REPO=${SnowflakeImageRepository}
            SNOWFLAKE_ACCOUNT_URL=${SnowflakeAccountURL}
            SNOWFLAKE_OAUTH_INTEGRATION_NAME=${SnowflakeOAuthIntegrationName}
            SNOWFLAKE_OAUTH_TOKEN_URL=${SnowflakeAccountURL}/oauth/token-request
            ENABLE_TELEMETRY=true
            TELEMETRY_URL=${TelemetryURL}
            CONTROL_PLANE_URL=${ControlPlaneURL}
            VPC_ID=${VpcId}
            PRIVATE_SUBNET_1_ID=${PrivateSubnet1Id}
            PRIVATE_SUBNET_2_ID=${PrivateSubnet2Id}
            DPS_ENABLE=true
            DPS_IMAGE_REPOSITORY=openflow-dataplane-service
            DPS_CHART_REPOSITORY=openflow-dataplane-service-chart
            DPS_CHART_VERSION=${DataPlaneServiceChartVersion}
            DPUI_IMAGE_REPOSITORY=openflow-dataplane-ui
            DPUI_CHART_REPOSITORY=openflow-dataplane-ui-chart
            DPUI_CHART_VERSION=${DataPlaneUIChartVersion}
            DPS_FEATURE_CONNECTOR_INSTALLATION=true
            USE_IMAGE_REGISTRY_OVER_PRIVATE_LINK=${UseImageRegistryOverPrivateLink}

            EOF

            chown ec2-user:ec2-user /home/ec2-user/.env
            chmod 644 /home/ec2-user/.env
            source /home/ec2-user/.env

            dnf update -y
            dnf install -y docker

            yum remove aws-cli -y
            curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
            unzip awscliv2.zip
            ./aws/install
            aws --version
            rm -rf awscliv2.zip aws/

            cat <<\EOF > /home/ec2-user/ensure-ecr-repo.sh
            #!/bin/bash
            source /home/ec2-user/.env

            instance_id=$(ec2-metadata -i | cut -d ' ' -f2)

            exists_check=$(aws ecr describe-repositories --region $AWS_REGION --repository-names $1 2>&1)
            if [ $? -ne 0 ]; then
              if echo $exists_check | grep -q RepositoryNotFoundException; then
                aws ecr create-repository --region $AWS_REGION --repository-name $1
              else
                >&2 echo $exists_check
              fi
            fi
            EOF

            SNOWFLAKE_CLI_ARCH=x86_64
            SNOWFLAKE_CLI_VERSION=3.8.3
            curl "https://sfc-repo.snowflakecomputing.com/snowflake-cli/linux_$SNOWFLAKE_CLI_ARCH/$SNOWFLAKE_CLI_VERSION/snowflake-cli-$SNOWFLAKE_CLI_VERSION.$SNOWFLAKE_CLI_ARCH.rpm" -o "snowflake-cli.rpm"
            rpm -i snowflake-cli.rpm

            systemctl start docker
            systemctl enable docker

            usermod -a -G docker ec2-user

            wget https://get.helm.sh/helm-v3.16.4-linux-amd64.tar.gz
            tar -zxvf helm-v3.16.4-linux-amd64.tar.gz
            mv ./linux-amd64/helm /usr/local/bin/helm
            rm -rf helm-v3.16.4-linux-amd64.tar.gz
            rm -rf linux-amd64

            cat <<\EOF > /home/ec2-user/.upgrade
            #!/bin/bash
            AGENT_IMAGE_VERSION_UPGRADE=
            OPERATOR_CHART_VERSION_UPGRADE=
            GATEWAY_IMAGE_VERSION_UPGRADE=
            DPS_CHART_VERSION_UPGRADE=
            DPUI_CHART_VERSION_UPGRADE=
            EOF

            cat <<\EOF > /home/ec2-user/sync-images.sh
            #!/bin/bash
            source /home/ec2-user/.env

            echo "Logging in to local private ECR..."
            aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $IMAGE_REGISTRY

            OAUTH_SECRET_ID="snowflake-oauth2-$UNIQUE_SUFFIX"

            oauth2_creds=$(aws secretsmanager get-secret-value --region $AWS_REGION --query SecretString --output text --secret-id $OAUTH_SECRET_ID)
            CLIENT_ID=$(echo $oauth2_creds | jq -r '.clientID')
            CLIENT_SECRET=$(echo $oauth2_creds | jq -r '.clientSecret')

            access_token=$(curl -s --request POST --url $SNOWFLAKE_OAUTH_TOKEN_URL \
              --header 'content-type: application/x-www-form-urlencoded' \
              --data-urlencode grant_type=client_credentials \
              --data-urlencode client_id=$CLIENT_ID \
              --data-urlencode client_secret=$CLIENT_SECRET | jq -r '.access_token')

            TOKEN_PATH="/home/ec2-user/access.token"
            echo -n $access_token > $TOKEN_PATH

            if [[ "$USE_IMAGE_REGISTRY_OVER_PRIVATE_LINK" == true ]];
              then
                echo "Connecting to image registry using a private link"
                private_link_param='--private-link';
              else
                echo "Connecting to image registry using a public endpoint"
                private_link_param='';
            fi

            SPCS_PARAMS="""-x \
              --account=$SF_ORG-$SF_ACCOUNT \
              --database=$SF_DB \
              --schema=$SF_SCHEMA \
              --host=$(echo $SNOWFLAKE_ACCOUNT_URL | sed 's/^https:\/\///') \
              --authenticator=OAUTH \
              --token-file-path=$TOKEN_PATH \
              --format=JSON
            """

            SF_REG=$(snow spcs image-registry url $private_link_param $SPCS_PARAMS | jq -r '.message')

            echo "Logging in to Snowflake Image Registry https://$SF_REG ..."
            echo $access_token | docker login "https://$SF_REG" \
              --username 0auth2accesstoken \
              --password-stdin

            echo $access_token | helm registry login "https://$SF_REG" \
              --username 0auth2accesstoken \
              --password-stdin

            echo "Discovering tags for repo $SF_IMG_REPO"
            latest_images=$(snow spcs image-repository list-images $SF_IMG_REPO $SPCS_PARAMS)

            rm $TOKEN_PATH

            echo "Found images\n$latest_images"

            docker_keep_only() {
              local image_name="$1"
              local image_tag="$2"

              docker images | grep "$image_name" | grep -v "$image_tag" | awk '{ print $3}' | sort | uniq | xargs docker rmi -f 2>/dev/null
            }

            sync_image() {
              local image_name="$1"
              local image_tag="$2"

              SNOW_IMAGE_PATH="$SF_DB/$SF_SCHEMA/$SF_IMG_REPO/$image_name:$image_tag"

              if [[ "$image_name" == *"-chart" ]]; then
                registryId=$(echo $IMAGE_REGISTRY | cut -d'.' -f1)
                foundImageLocalEcr="$(aws ecr describe-images --region $AWS_REGION --registry-id $registryId --repository-name "snowflake-openflow/$image_name" --image-ids=imageTag=$image_tag 2>&1)"
                if [ $? -ne 0 ]; then
                  echo "No $image_name:$image_tag found locally, so syncing"
                  helm pull oci://$SF_REG/$SF_DB/$SF_SCHEMA/$SF_IMG_REPO/$image_name --version $image_tag
                  helm push $image_name-$image_tag.tgz oci://$IMAGE_REGISTRY/snowflake-openflow
                  app_version=$(helm show values $image_name-$image_tag.tgz --jsonpath "{$.image.tag}")
                  if [[ $OPERATOR_CHART_REPOSITORY == $image_name ]]; then
                    echo "Pulling app $OPERATOR_IMAGE_REPOSITORY $app_version for helm chart $image_name $image_tag"
                    sync_image $OPERATOR_IMAGE_REPOSITORY $app_version
                  elif [[ $DPS_CHART_REPOSITORY == $image_name ]]; then
                    echo "Pulling app $DPS_IMAGE_REPOSITORY $app_version for helm chart $image_name $image_tag"
                    sync_image $DPS_IMAGE_REPOSITORY $app_version
                  elif [[ $DPUI_CHART_REPOSITORY == $image_name ]]; then
                    echo "Pulling app $DPUI_IMAGE_REPOSITORY $app_version for helm chart $image_name $image_tag"
                    sync_image $DPUI_IMAGE_REPOSITORY $app_version
                  fi
                  rm ./$image_name-$image_tag.tgz
                else
                  echo "Found $image_name:$image_tag in local ECR, so no need to re-sync"
                fi
              else
                LOCAL_IMAGE_PATH=$IMAGE_REGISTRY/snowflake-openflow/$image_name:$image_tag
                docker pull $SF_REG/$SNOW_IMAGE_PATH
                docker tag $SF_REG/$SNOW_IMAGE_PATH $LOCAL_IMAGE_PATH
                docker push $LOCAL_IMAGE_PATH
              fi
            }

            # If specific versions are set, make sure that we sync those
            if [ -n "$AGENT_IMAGE_VERSION" ]; then
              sync_image $AGENT_IMAGE_REPOSITORY $AGENT_IMAGE_VERSION
            fi

            if [ -n "$GATEWAY_IMAGE_VERSION" ]; then
              sync_image $GATEWAY_IMAGE_REPOSITORY $GATEWAY_IMAGE_VERSION
            fi

            if [ -n "$OPERATOR_CHART_VERSION" ]; then
              sync_image $OPERATOR_CHART_REPOSITORY $OPERATOR_CHART_VERSION
            fi

            if [ -n "$DPS_CHART_VERSION" ]; then
              sync_image $DPS_CHART_REPOSITORY $DPS_CHART_VERSION
            fi

            if [ -n "$DPUI_CHART_VERSION" ]; then
              sync_image $DPUI_CHART_REPOSITORY $DPUI_CHART_VERSION
            fi

            image_repos="$AGENT_IMAGE_REPOSITORY $EXTENSIONS_IMAGE_REPOSITORY $OPERATOR_IMAGE_REPOSITORY $OPERATOR_CHART_REPOSITORY $SERVER_IMAGE_REPOSITORY $DPS_IMAGE_REPOSITORY $DPS_CHART_REPOSITORY $DPUI_IMAGE_REPOSITORY $DPUI_CHART_REPOSITORY $GATEWAY_IMAGE_REPOSITORY"
            IFS=' '
            for image_name in $image_repos
            do
              # Find only the latest tag for this image
              latest_tag=$(echo "$latest_images" | jq -r ".[] | select(.image_name == \"$image_name\") | .tags" | sed -r 's/-/~/' | sort -Vr | sed -r 's/~/-/' | head -1)

              sync_image $image_name $latest_tag

              if [[ $AGENT_IMAGE_REPOSITORY == $image_name ]]; then
                if [ -z "$AGENT_IMAGE_VERSION" ]; then
                  echo "Setting Agent image version to $latest_tag"
                  sed -i'' -Ee "s|(AGENT_IMAGE_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
                  docker_keep_only $image_name $latest_tag
                else
                  if [[ $AGENT_IMAGE_VERSION != $latest_tag ]]; then
                    sed -i'' -Ee "s|(AGENT_IMAGE_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
                  fi
                  docker_keep_only $image_name $AGENT_IMAGE_VERSION
                fi
              elif [[ $GATEWAY_IMAGE_REPOSITORY == $image_name ]]; then
                if [ -z "$GATEWAY_IMAGE_VERSION" ]; then
                  echo "Setting Gateway image version to $latest_tag"
                  sed -i'' -Ee "s|(GATEWAY_IMAGE_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
                  docker_keep_only $image_name $latest_tag
                else
                  if [[ $GATEWAY_IMAGE_VERSION != $latest_tag ]]; then
                    sed -i'' -Ee "s|(GATEWAY_IMAGE_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
                  fi
                  docker_keep_only $image_name $GATEWAY_IMAGE_VERSION
                fi
              elif [[ $OPERATOR_CHART_REPOSITORY == $image_name ]]; then
                if [ -z "$OPERATOR_CHART_VERSION" ]; then
                  echo "Setting Operator chart version to $latest_tag"
                  sed -i'' -Ee "s|(OPERATOR_CHART_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
                elif [[ $OPERATOR_CHART_VERSION != $latest_tag ]]; then
                  sed -i'' -Ee "s|(OPERATOR_CHART_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
                fi
              elif [[ $DPS_CHART_REPOSITORY == $image_name ]]; then
                if [ -z "$DPS_CHART_VERSION" ]; then
                  echo "Setting DPS chart version to $latest_tag"
                  sed -i'' -Ee "s|(DPS_CHART_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
                elif [[ $DPS_CHART_VERSION != $latest_tag ]]; then
                  sed -i'' -Ee "s|(DPS_CHART_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
                fi
              elif [[ $DPUI_CHART_REPOSITORY == $image_name ]]; then
                if [ -z "$DPUI_CHART_VERSION" ]; then
                  echo "Setting DP UI chart version to $latest_tag"
                  sed -i'' -Ee "s|(DPUI_CHART_VERSION=).*|\1$latest_tag|g" /home/ec2-user/.env
                elif [[ $DPUI_CHART_VERSION != $latest_tag ]]; then
                  sed -i'' -Ee "s|(DPUI_CHART_VERSION_UPGRADE=).*|\1$latest_tag|g" /home/ec2-user/.upgrade
                fi
              else
                # Delete any other versions of image from local Docker cache
                docker_keep_only $image_name $latest_tag
              fi

            done
            unset IFS

            echo "Done syncing repositories!"
            EOF

            cat <<\EOF > /etc/systemd/system/openflow-sync-images.service
            [Unit]
            Description="Sync images for Snowflake Openflow"
            [Service]
            User=ec2-user
            Group=ec2-user
            TimeoutSec=600
            WorkingDirectory=/home/ec2-user
            ExecStart=/home/ec2-user/sync-images.sh
            [Install]
            WantedBy=multi-user.target
            EOF

            cat <<\EOF > /etc/systemd/system/openflow-sync-images.timer
            [Unit]
            Description="Run openflow-sync-images.service every 10 minutes relative to deactivation time"
            [Timer]
            OnBootSec=5min
            OnUnitInactiveSec=10min
            Unit=openflow-sync-images.service
            [Install]
            WantedBy=timers.target
            EOF

            cat <<\EOF > /home/ec2-user/create.sh
            #!/bin/bash
            source /home/ec2-user/.env
            sudo systemctl enable openflow-apply-infrastructure.timer
            sudo systemctl start openflow-apply-infrastructure.timer
            sudo systemctl enable openflow-apply-infrastructure.service
            sudo systemctl start openflow-apply-infrastructure.service
            sudo journalctl -u openflow-apply-infrastructure.service -f
            EOF

            cat <<\EOF > /home/ec2-user/apply-infrastructure.sh
            #!/bin/bash
            source /home/ec2-user/.env
            # Run in the foreground
            docker run --name apply_infra --log-driver=journald --env-file /home/ec2-user/.env --rm "$IMAGE_REGISTRY/snowflake-openflow/$AGENT_IMAGE_REPOSITORY:$AGENT_IMAGE_VERSION" /opt/openflow/create.sh
            EOF

            cat <<\EOF > /etc/systemd/system/openflow-apply-infrastructure.service
            [Unit]
            Description="Apply baseline Snowflake Openflow infrastructure configuration"
            [Service]
            User=ec2-user
            Group=ec2-user
            TimeoutSec=0
            WorkingDirectory=/home/ec2-user
            ExecStart=/home/ec2-user/apply-infrastructure.sh
            [Install]
            WantedBy=multi-user.target
            EOF

            cat <<\EOF > /etc/systemd/system/openflow-apply-infrastructure.timer
            [Unit]
            Description="Run openflow-apply-infrastructure.service every 10 minutes relative to deactivation time"
            [Timer]
            OnBootSec=5min
            OnUnitInactiveSec=10min
            Unit=openflow-apply-infrastructure.service
            [Install]
            WantedBy=timers.target
            EOF

            cat <<\EOF > /home/ec2-user/destroy.sh
            #!/bin/bash
            source /home/ec2-user/.env

            read -rp $'\nAre you sure you want to delete the Data Plane and all Runtimes present? [Y/n]: ' proceed
            if [ -z "$proceed" ]; then
              proceed="Y"
            fi
            [[ ! $proceed =~ ^[Yy]$ ]] && { echo -e "\nOperation canceled, aborting teardown procedure!"; echo; exit 1; }

            # Stop auto-apply of infrastructure baseline
            sudo systemctl disable openflow-apply-infrastructure.timer
            sudo systemctl stop openflow-apply-infrastructure.timer
            sudo systemctl disable openflow-apply-infrastructure.service
            sudo systemctl stop openflow-apply-infrastructure.service

            container_id=$(docker run --log-driver=journald --env-file /home/ec2-user/.env -d --rm "$IMAGE_REGISTRY/snowflake-openflow/$AGENT_IMAGE_REPOSITORY:$AGENT_IMAGE_VERSION" /bin/bash -c "/opt/openflow/cli/update-kubeconfig.sh; /opt/openflow/destroy.sh")
            docker logs -f $container_id
            EOF

            cat <<\EOF > /home/ec2-user/diagnostics.sh
            #!/bin/bash
            source /home/ec2-user/.env
            docker run -it --env-file /home/ec2-user/.env --rm -w "/opt/openflow/cli" "$IMAGE_REGISTRY/snowflake-openflow/$AGENT_IMAGE_REPOSITORY:$AGENT_IMAGE_VERSION" /bin/bash -c "/opt/openflow/cli/update-kubeconfig.sh; ./diagnostics.sh"
            EOF

            cat <<\EOF > /home/ec2-user/upgrade-data-plane.sh
            #!/bin/bash
            set -o errexit
            source ~/.env
            source ~/.upgrade

            images="[
              {\"repo\":\"$AGENT_IMAGE_REPOSITORY\",\"version_env\":\"AGENT_IMAGE_VERSION\"},
              {\"repo\":\"$DPS_CHART_REPOSITORY\",\"version_env\":\"DPS_CHART_VERSION\"},
              {\"repo\":\"$DPUI_CHART_REPOSITORY\",\"version_env\":\"DPUI_CHART_VERSION\"},
              {\"repo\":\"$GATEWAY_IMAGE_REPOSITORY\",\"version_env\":\"GATEWAY_IMAGE_VERSION\"},
              {\"repo\":\"$OPERATOR_CHART_REPOSITORY\",\"version_env\":\"OPERATOR_CHART_VERSION\"}
            ]"

            echo "$images" | jq -c -r '.[]' | while read item; do
              repo=$(jq -r '.repo' <<< $item)
              version_env=$(jq -r '.version_env' <<< $item)
              version="${!!version_env}"
              latestVersionVar="$version_env"_UPGRADE
              latestVersion="${!!latestVersionVar}"

              echo "$repo is set to version $version"
              if [[ -z $latestVersion || $version == $latestVersion ]]; then
                echo -e "\tNo upgrade is available"
              else
                sed -i'' -Ee "s|($version_env=).*|\1$latestVersion|g" /home/ec2-user/.env
                echo -e "\tUpgrade set to version $latestVersion"
              fi

              sed -i'' -Ee "s|(""$version_env""_UPGRADE=).*|\1|g" /home/ec2-user/.upgrade
            done

            echo -e "\n*****\nUpgrades are applied during the next cycle:"
            sudo systemctl list-timers openflow-apply-infrastructure | head -n 2
            EOF

            chmod +x /home/ec2-user/*.sh
            chown ec2-user:ec2-user /home/ec2-user/*.sh

            image_repos="$AGENT_IMAGE_REPOSITORY $EXTENSIONS_IMAGE_REPOSITORY $OPERATOR_IMAGE_REPOSITORY $OPERATOR_CHART_REPOSITORY $SERVER_IMAGE_REPOSITORY $DPS_IMAGE_REPOSITORY $DPS_CHART_REPOSITORY $DPUI_IMAGE_REPOSITORY $DPUI_CHART_REPOSITORY $GATEWAY_IMAGE_REPOSITORY"
            IFS=' '
            for i in $image_repos
            do
              /home/ec2-user/ensure-ecr-repo.sh snowflake-openflow/$i
            done
            unset IFS

            /home/ec2-user/sync-images.sh

            systemctl daemon-reload

            systemctl enable openflow-sync-images.timer
            systemctl start openflow-sync-images.timer
            systemctl enable openflow-sync-images.service
            systemctl start openflow-sync-images.service

            systemctl enable openflow-apply-infrastructure.timer
            systemctl start openflow-apply-infrastructure.timer
            systemctl enable openflow-apply-infrastructure.service
            systemctl start openflow-apply-infrastructure.service

          - AWSAccountId: !Ref AWS::AccountId
            AWSRegion: !Ref AWS::Region
            TerraformStateBucket: !Ref TerraformStateBucket
            DataPlaneKey: !Ref DataPlaneKey
            DataPlaneUUID: !Ref DataPlaneUUID
            DataPlaneURL: !Ref DataPlaneURL
            AgentImageVersion: !Ref AgentImageVersion
            DataPlaneServiceChartVersion: !Ref DataPlaneServiceChartVersion
            DataPlaneUIChartVersion: !Ref DataPlaneUIChartVersion
            RuntimeOperatorChartVersion: !Ref RuntimeOperatorChartVersion
            RuntimeGatewayVersion: !Ref RuntimeGatewayVersion
            SnowflakeOrganization: !Ref SnowflakeOrganization
            SnowflakeAccount: !Ref SnowflakeAccount
            SnowflakeDatabase: !Ref SnowflakeDatabase
            SnowflakeSchema: !Ref SnowflakeSchema
            SnowflakeImageRepository: !Ref SnowflakeImageRepository
            SnowflakeAccountURL: !Ref SnowflakeAccountURL
            SnowflakeOAuthIntegrationName: !Ref SnowflakeOAuthIntegrationName
            TelemetryURL: !Ref TelemetryURL
            ControlPlaneURL: !Ref ControlPlaneURL
            VpcId: !Ref InfraVPC
            PrivateSubnet1Id: !Ref PrivateSubnet1
            PrivateSubnet2Id: !Ref PrivateSubnet2
            UseImageRegistryOverPrivateLink: !Ref UseImageRegistryOverPrivateLink

※本来のものから間接参照その他を修正しています。

課金について

便利な Openflow ですが、課金には注意が必要です。

Openflow の課金対象要素は以下の通りです。

種類 説明
サービス Openflow のアクティブなコネクタランタイムに対して発生する費用
基盤 AWS の場合、以下で費用が発生する。
  • VPC:基盤となるネットワーク
  • EC2:デプロイメント、ランタイムで必要となるリソース
  • EKS:ランタイムで必要となるリソース
  • S3:ログの出力先
取り込み データの取り込みに対して発生する費用
テレメトリー イベントのログやメトリクスに対して発生する費用

特に注意が必要なのは基盤部分です。

ec2-compute-reqs.png

画像:Understanding Openflow costs | Snowflake Documentationより引用

公式ドキュメントにもあるように、ランタイムを作成した時点で以下の3種類のインスタンスが起動します。

  • エージェントインスタンス(コンテナイメージの最新化、インフラ構築)
  • 管理コンテナ(Openflowアプリケーション全体の管理)
  • ランタイムコンテナ(レプリケーション処理実行)

各インスタンスはデフォルトで常時稼働状態であるため、使用しない時間帯は停止するなどして、課金の発生を抑える必要があります。

image-14.png

例:PostgreSQL (RDS) の100GBのデータを1か月間継続ロードした場合

  • サービス:0.0225 * 1 * 24 * 30 * $4.30 = $69.66
  • 基盤
    • VPC:NATゲートウェイの合計データ処理量が200GBと仮定して、$0.062 * 24 * 30 + 200 * $0.062 = $57.04
    • EC2:
    • t3.medium:$0.0136 * 24 * 30 = $9.792
    • m7i.xlarge:$0.2604 * 24 * 30 = $187.488
    • m7i.2xlarge:$0.5208 * 24 * 30 = $374.976
    • EKS:クラスター1つに対し、$0.10 * 24 * 30 = $72
    • S3:ログの最大出力量を1GBとして、$0.025 * 1 = $0.025
  • 取り込み:Snowpipe Streaming で合計データ処理量200GBと仮定して、0.0037 * 200 * $4.30 = $3.182
  • テレメトリー:ログの最大出力量を1GBとして、0.02 * 1 * $4.30 = $0.086

上記を合計すると、$774.249 ≒ 108,395円(1ドル140円)

クレジットの詳細:CreditConsumptionTable.pdf

計算例でもわかる通り、EC2部分の料金が大半を占めています。最小の構成でもそこそこ大きなインスタンスが立ってしまうので、注意する必要があります。

さいごに

Openflow の使い方について、詳しくご紹介しました。

Openflow は多くのコネクタが提供されており、様々なデータソースを一元的に集約して Snowflake に取り込むことができる便利なサービスですが、デフォルトである程度パワフルな構成になっているため、想像以上に課金が発生する点については注意が必要です。

特定のデータソースの連携が目的であれば、こちらの記事でもご紹介しているように Snowpipe Streaming といったサービスを使う方法もあります。ご利用のユースケースに応じて、最適な方法を選択するようにしてください。

参考

この記事をシェアする

  • Facebook
  • X
  • Pocket
  • Line
  • Hatena
  • Linkedin

資料請求・お問い合わせはこちら

ページトップへ戻る