Snowpipe Streaming は Snowpipe の補完を目的とした機能であり、Snowpipe で対応できないストリーミングデータの取り込みを行うことができます。Snowpipe Streaming は、以下の2つの方法で利用できます。
- Java SDKを利用したクライアントアプリケーションからのデータストリーミング
- Kafkaコネクタ(Snowflake Connector for Kafka)を用いた Kafka からのデータストリーミング
以降では、Azure Event Hubs とKafkaコネクタを用いたデータストリーミングの詳細についてまとめます。
Java SDKを利用したクライアントアプリケーションからのデータストリーミングについては、こちらの記事をご参照ください。
※検証の際、操作はすべてACCOUNTADMIN
ロールで行っています。
手順
以下の手順に従い、各種設定を行います。
1. 認証設定
キーペア作成
クライアントアプリケーションから Snowflake に対して通信を行うにあたり、認証用のRSAキーペアを作成する必要があります。Linux 環境でターミナルから以下のコマンドを実行します。
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
これにより、以下のような秘密鍵 rsa_key.p8 が生成されます。
-----BEGIN PRIVATE KEY-----
MIIEvgIB...
-----END PRIVATE KEY-----
※rsa_key.p8 は第三者に見られることがないよう、安全な場所に保管します。
また、以下のコマンドで公開鍵を作成します。
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
公開鍵の中身は以下のようになっています。
-----BEGIN PUBLIC KEY-----
MIIBIjAN...
-----END PUBLIC KEY-----
公開鍵登録
作成した公開鍵を Snowflake ユーザに割り当てます。
Snowflake のワークシートで以下のようなSQLを実行します。
ALTER USER example_user SET RSA_PUBLIC_KEY='MIIBIjAN...';
※example_user
とRSA_PUBLIC_KEY
の値はご自身のものを指定してください。
補足
サポートされているツールで生成されたキーペアでない場合、キー登録時に「Invalid Public key」のようなエラーが出ます。Windows の場合は OpenSSH をダウンロードしてキーペアを作成するか、上記のように WSL 等の Linux 環境で生成したキーペアを使用してください。
2. Kafkaコネクタ設定
Azure Event Hubs と Kafka のセットアップについては、こちらの記事をご参照ください。
セットアップが完了したら、Kafkaコネクタの設定を行います。
Kafkaコネクタダウンロード
ターミナルで以下のコマンドを実行し、Kafkaコネクタと必要なjarをダウンロードします。
$ cd kafka_2.13-3.9.0/libs/
$ curl https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/2.2.1/snowflake-kafka-connector-2.2.1.jar --output snowflake-kafka-connector-2.2.1.jar
$ curl https://repo1.maven.org/maven2/net/snowflake/snowflake-ingest-sdk/2.1.0/snowflake-ingest-sdk-2.1.0.jar --output snowflake-ingest-sdk-2.1.0.jar
$ curl https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.14.5/snowflake-jdbc-3.14.5.jar --output snowflake-jdbc-3.14.5.jar
$ curl https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar --output bc-fips-1.0.1.jar
$ curl https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar --output bcpkix-fips-1.0.3.jar
※各ライブラリのバージョンは継続的に更新されているためご注意ください。上記は検証時点で問題なく機能したバージョンの組み合わせとなっています。
設定ファイル作成
下記コマンドで kafka-cli ディレクトリに移動します。
$ cd ~/azure-event-hubs-for-kafka/quickstart/kafka-cli/
Kafkaコネクタをスタンドアロンモードで起動するため、以下の内容を記載した connect-standalone.properties ファイルを作成します。
bootstrap.servers=<Event Hubs namespace>.servicebus.windows.net:9093
# Snowflake value conversion
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
plugin.path=<kafka_home>/libs
以下の項目はご自身の環境のものを設定してください。
※括弧内のデータは今回の設定例
<Event Hubs namespace>
:Event Hubs名前空間(eh-snowflake-je
)<kafka_home>
:Kafkaをインストールしたディレクトリのパス(/home/tani/kafka_2.13-3.9.0
)
また、以下の内容を記載した SF_connect.properties ファイルを作成します。
name=kafka_snowpipe_streaming
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=1
topics=<topic_name>
snowflake.topic2table.map=<topic_name>:<table_name>
buffer.count.records=10000
buffer.flush.time=5
buffer.size.bytes=20000000
snowflake.url.name=<account_id>.snowflakecomputing.com:443
snowflake.user.name=<user_name>
snowflake.private.key=<private_key>
snowflake.database.name=<db_name>
snowflake.schema.name=<schema_name>
snowflake.role.name=<role_name>
snowflake.ingestion.method=SNOWPIPE_STREAMING
snowflake.enable.schematization=false
value.converter.schemas.enable=false
jmx=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
errors.tolerance=all
以下の項目はご自身の環境のものを指定してください。
※括弧内のデータは今回の設定例
<topic_name>
:Kafkaトピック名(test-topic
)<account_id>
:アカウント識別子<user_name>
:ユーザ名(TANI
)<private_key>
:秘密鍵(MIIEvgIB...
)<table_name>
:対象テーブル名(snowpipe_streaming_kafka
)<db_name>
:対象データベース名(TEST
)<schema_name>
:対象スキーマ名(PUBLIC
)<role_name>
:ロール名(ACCOUNTADMIN
)
テーブルが存在しない場合、Kafkaコネクターが<table_name>
で指定されたテーブルを自動的に作成するため、事前に作成しておく必要はありません。
<user_name>
, <account_id>
, <role_name>
に必要な情報は、Snowflake メニュー最下部から「アカウント」→「アカウントの詳細を表示する」で確認可能です。
<private_key>
には「1. 認証設定」で作成した秘密鍵を以下のように1行で記入します。
snowflake.private.key=MIIEvgIB...
上記設定後の kafka-cli フォルダ内部は以下の状態です。
3. Kafkaコネクタ起動
起動前に忘れずに環境変数の設定をしておきます。(セッションをまたいで使用できるよう、.bashrc に追記しておくと便利です)
$ export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
$ export KAFKA_INSTALL_HOME=[Kafka をインストールしたディレクトリ(例:/home/tani/kafka_2.13-3.9.0)]
以下のコマンドでKafkaコネクタを起動します。
$ $KAFKA_INSTALL_HOME/bin/connect-standalone.sh connect-standalone.properties SF_connect.properties
別のターミナルを開き、以下のように Producer を起動してメッセージを送信します。
$ $KAFKA_INSTALL_HOME/bin/kafka-console-producer.sh --topic test-topic --broker-list eh-snowflake-je.servicebus.windows.net:9093 --producer.config client_common.properties
>aaa
>bbb
>ccc
上記の実行後に Snowflake を確認すると、テーブルが作成されデータが取り込まれていることが確認できます。