Snowpipe Streaming は Snowpipe の補完を目的とした機能であり、Snowpipe で対応できないストリーミングデータの取り込みを行うことができます。Snowpipe Streaming は、以下の2つの方法で利用できます。
- Java SDKを利用したクライアントアプリケーションからのデータストリーミング
- Kafkaコネクタ(Snowflake Connector for Kafka)を用いた Kafka からのデータストリーミング
以降では、Java SDKを利用したクライアントアプリケーションからのデータストリーミングの詳細についてまとめます。
Kafkaコネクタを用いた Kafka からのデータストリーミングについては、こちらの記事をご参照ください。
※検証の際、操作はすべてACCOUNTADMIN
ロールで行っています。
準備
Snowpipe Streaming は現時点で Java SDK のみ対応しており、Kafkaコネクタを使用しない場合は Java でクライアントアプリケーションを構築する必要があります。
そのため、最初に Java の開発環境をセットアップしておきます。
Pleiades All in One ダウンロード
Pleiades All in One は Java の統合開発環境である Eclipse と、関連する便利なプラグインがまとめられたパッケージです。公式ページから最新のバージョンのもの(執筆時は Eclipse 2025)を選択し、ダウンロードします。
インストール
ダウンロードしたファイルをダブルクリックします。
※警告が出た場合は、詳細情報を開いて「実行」をクリックします。
ポップアップが表示されたら「解凍」をクリックします。
Eclipse 起動確認
インストール先のディレクトリで「eclipse」フォルダを開き、「eclipse.exe」を実行します。
ワークスペースディレクトリはデフォルトのままとし、起動をクリックします。
Eclipse が起動した状態になりました。
以上で Java の開発環境構築は完了です。
手順
以下の手順に従い、各種設定とクライアントアプリケーションの構築を行います。
1. 認証設定
キーペア作成
クライアントアプリケーションから Snowflake に対して通信を行うにあたり、認証用のRSAキーペアを作成する必要があります。Linux 環境でターミナルから以下のコマンドを実行します。
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
これにより、以下のような秘密鍵 rsa_key.p8 が生成されます。
-----BEGIN PRIVATE KEY-----
MIIEvgIB...
-----END PRIVATE KEY-----
※rsa_key.p8 は第三者に見られることがないよう、安全な場所に保管します。
また、以下のコマンドで公開鍵を作成します。
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
公開鍵の中身は以下のようになっています。
-----BEGIN PUBLIC KEY-----
MIIBIjAN...
-----END PUBLIC KEY-----
公開鍵登録
作成した公開鍵を Snowflake ユーザに割り当てます。
Snowflake のワークシートで以下のようなSQLを実行します。
ALTER USER example_user SET RSA_PUBLIC_KEY='MIIBIjAN...';
※example_user
とRSA_PUBLIC_KEY
の値はご自身のものを指定してください。
補足
サポートされているツールで生成されたキーペアでない場合、キー登録時に「Invalid Public key」のようなエラーが出ます。Windows の場合は OpenSSH をダウンロードしてキーペアを作成するか、上記のように WSL 等の Linux 環境で生成したキーペアを使用してください。
2. テーブル作成
データ挿入対象のテーブルを作成しておきます。
CREATE OR REPLACE TABLE TEST.PUBLIC.snowpipe_streaming_test (
id NUMBER(38,0),
channel_name VARCHAR
);
3. プロジェクト設定
Eclipse を開き、「ファイル」→「新規」→「Maven プロジェクト」をクリックします。
「シンプルなプロジェクトの作成」にチェックを入れて「次へ」をクリックします。
グループIDとアーティファクトIDを以下のように入力し、「完了」をクリックします。
プロジェクトが開いたら pom.xml をクリックし、中身を以下のように編集し上書き保存します。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>snowpipe-streaming</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!-- 文字コードとJavaのバージョンの設定 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<!-- 依存関係の設定 -->
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.3.4</version>
</dependency>
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
<!-- プラグインの設定 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
※ pom.xml はプロジェクト設定ファイルであり、プロジェクト管理ツールである Maven が pom.xml の設定内容をもとに依存関係に記載されたパッケージや必要なプラグインのインストールを自動で行います。
pom.xml の内容を反映させるため、対象のプロジェクトを選択した状態で「実行」→「Maven install」をクリックします。
するとインストール処理が進み、コンソールに以下のようなログが表示されます。
「BUILD SUCCESS」と出ていればOKです。
4. 認証情報ファイル作成
対象プロジェクトを右クリックし「新規」→「ファイル」をクリックします。
出てきたポップアップで、ファイル名を「profile.json」と指定して「完了」をクリックします。
ファイルが作成されたら、以下のように編集し保存します。
{
"user": "<user_name>",
"url": "https://<account_id>.snowflakecomputing.com:443",
"private_key": "<private_key>",
"port": 443,
"host": "<account_id>.snowflakecomputing.com",
"scheme": "https",
"role": "<role_name>"
}
各項目に必要な情報は、Snowflake メニュー最下部から「アカウント」→「アカウントの詳細を表示する」で確認可能です。
<user_name>
は「ユーザ名」、<account_id>
は「アカウント識別子」、<role_name>
は「ロール」を指定します。
<private_key>
には「1. 認証設定」で作成した秘密鍵を以下のように1行で記入します。
"private_key": "MIIEvgIB..."
5. アプリケーション作成
プロジェクトの src > main > java を開き、右クリックで「新規」→「ファイル」をクリックします。
出てきたポップアップで、ファイル名を「SnowflakeStreamingIngest.java」と指定して「完了」をクリックします。
ファイルが作成されたら、以下のコードをコピー&ペーストして上書き保存します。
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
*/
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
public class SnowflakeStreamingIngest {
private static String PROFILE_PATH = "profile.json";
private static final ObjectMapper mapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Properties props = new Properties();
Iterator<Map.Entry<String, JsonNode>> propIt =
mapper.readTree(new String(Files.readAllBytes(Paths.get(PROFILE_PATH)))).fields();
while (propIt.hasNext()) {
Map.Entry<String, JsonNode> prop = propIt.next();
props.put(prop.getKey(), prop.getValue().asText());
}
try (SnowflakeStreamingIngestClient client =
SnowflakeStreamingIngestClientFactory.builder("MY_CLIENT").setProperties(props).build()) {
OpenChannelRequest request1 =
OpenChannelRequest.builder("CHANNEL_1")
.setDBName("TEST")
.setSchemaName("PUBLIC")
.setTableName("snowpipe_streaming_test")
.setOnErrorOption(
OpenChannelRequest.OnErrorOption.CONTINUE)
.build();
// 通信用のチャネルオープン
SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);
// 行単位でのデータ挿入
final int totalRowsInTable = 10;
for (int val = 0; val < totalRowsInTable; val++) {
Map<String, Object> row = new HashMap<>();
// テーブルの各列にデータを入れる
row.put("id", val);
row.put("channel_name", "CHANNEL_1");
// 挿入操作の検証用レスポンス
InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
if (response.hasErrors()) {
// エラーがあった場合、例外を投げる
throw response.getInsertErrors().get(0).getException();
}
}
final int expectedOffsetTokenInSnowflake = totalRowsInTable - 1;
final int maxRetries = 10;
int retryCount = 0;
// オフセットトークンの確認
do {
String offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken();
if (offsetTokenFromSnowflake != null
&& offsetTokenFromSnowflake.equals(String.valueOf(expectedOffsetTokenInSnowflake))) {
System.out.println("SUCCESSFULLY inserted " + totalRowsInTable + " rows");
break;
}
retryCount++;
} while (retryCount < maxRetries);
// チャネルクローズ
channel1.close().get();
}
}
}
6. 動作確認
「SnowflakeStreamingIngest.java」を右クリックし、「実行」→「Java アプリケーション」をクリックします。
実行後に Snowflake でテーブルを確認すると、データが取り込まれていることが確認できます。
補足
以下のSQLコマンドを実行すると、データ送信の際に作成されたチャネルの詳細情報を確認できます。
SHOW CHANNELS;
また、以下のコマンドにより、Snowpipe Streaming のデータ取り込み履歴を確認可能です。
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY
ORDER BY 1;
データ取り込み履歴が1時間単位で記録されており、消費クレジットや送信データのバイト数、処理対象の行数などが表示されています。