レック・テクノロジー・コンサルティング株式会社TECH BLOG

Snowpipe Streaming によるストリーミングデータ取り込み方法解説(Kafkaコネクタ利用)

Snowpipe Streaming は Snowpipe の補完を目的とした機能であり、Snowpipe で対応できないストリーミングデータの取り込みを行うことができます。Snowpipe Streaming は、以下の2つの方法で利用できます。

  • Java SDKを利用したクライアントアプリケーションからのデータストリーミング
  • Kafkaコネクタ(Snowflake Connector for Kafka)を用いた Kafka からのデータストリーミング

以降では、Azure Event Hubs とKafkaコネクタを用いたデータストリーミングの詳細についてまとめます。
Java SDKを利用したクライアントアプリケーションからのデータストリーミングについては、こちらの記事をご参照ください。

※検証の際、操作はすべてACCOUNTADMINロールで行っています。

手順

以下の手順に従い、各種設定を行います。

1. 認証設定

キーペア作成

クライアントアプリケーションから Snowflake に対して通信を行うにあたり、認証用のRSAキーペアを作成する必要があります。Linux 環境でターミナルから以下のコマンドを実行します。

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

これにより、以下のような秘密鍵 rsa_key.p8 が生成されます。

-----BEGIN PRIVATE KEY-----
MIIEvgIB...
-----END PRIVATE KEY-----

※rsa_key.p8 は第三者に見られることがないよう、安全な場所に保管します。

また、以下のコマンドで公開鍵を作成します。

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

公開鍵の中身は以下のようになっています。

-----BEGIN PUBLIC KEY-----
MIIBIjAN...
-----END PUBLIC KEY-----

公開鍵登録

作成した公開鍵を Snowflake ユーザに割り当てます。
Snowflake のワークシートで以下のようなSQLを実行します。

ALTER USER example_user SET RSA_PUBLIC_KEY='MIIBIjAN...';

example_userRSA_PUBLIC_KEYの値はご自身のものを指定してください。

補足

サポートされているツールで生成されたキーペアでない場合、キー登録時に「Invalid Public key」のようなエラーが出ます。Windows の場合は OpenSSH をダウンロードしてキーペアを作成するか、上記のように WSL 等の Linux 環境で生成したキーペアを使用してください。

2. Kafkaコネクタ設定

Azure Event Hubs と Kafka のセットアップについては、こちらの記事をご参照ください。
セットアップが完了したら、Kafkaコネクタの設定を行います。

Kafkaコネクタダウンロード

ターミナルで以下のコマンドを実行し、Kafkaコネクタと必要なjarをダウンロードします。

$ cd kafka_2.13-3.9.0/libs/
$ curl https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/2.2.1/snowflake-kafka-connector-2.2.1.jar --output snowflake-kafka-connector-2.2.1.jar
$ curl https://repo1.maven.org/maven2/net/snowflake/snowflake-ingest-sdk/2.1.0/snowflake-ingest-sdk-2.1.0.jar --output snowflake-ingest-sdk-2.1.0.jar
$ curl https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.14.5/snowflake-jdbc-3.14.5.jar --output snowflake-jdbc-3.14.5.jar
$ curl https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar --output bc-fips-1.0.1.jar
$ curl https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar --output bcpkix-fips-1.0.3.jar

※各ライブラリのバージョンは継続的に更新されているためご注意ください。上記は検証時点で問題なく機能したバージョンの組み合わせとなっています。

設定ファイル作成

下記コマンドで kafka-cli ディレクトリに移動します。

$ cd ~/azure-event-hubs-for-kafka/quickstart/kafka-cli/

Kafkaコネクタをスタンドアロンモードで起動するため、以下の内容を記載した connect-standalone.properties ファイルを作成します。

bootstrap.servers=<Event Hubs namespace>.servicebus.windows.net:9093

# Snowflake value conversion
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
plugin.path=<kafka_home>/libs

以下の項目はご自身の環境のものを設定してください。
※括弧内のデータは今回の設定例

  • <Event Hubs namespace>:Event Hubs名前空間(eh-snowflake-je
  • <kafka_home>:Kafkaをインストールしたディレクトリのパス(/home/tani/kafka_2.13-3.9.0

また、以下の内容を記載した SF_connect.properties ファイルを作成します。

name=kafka_snowpipe_streaming
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=1
topics=<topic_name>
snowflake.topic2table.map=<topic_name>:<table_name>
buffer.count.records=10000
buffer.flush.time=5
buffer.size.bytes=20000000
snowflake.url.name=<account_id>.snowflakecomputing.com:443
snowflake.user.name=<user_name>
snowflake.private.key=<private_key>
snowflake.database.name=<db_name>
snowflake.schema.name=<schema_name>
snowflake.role.name=<role_name>
snowflake.ingestion.method=SNOWPIPE_STREAMING
snowflake.enable.schematization=false
value.converter.schemas.enable=false
jmx=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
errors.tolerance=all

以下の項目はご自身の環境のものを指定してください。
※括弧内のデータは今回の設定例

  • <topic_name>:Kafkaトピック名(test-topic
  • <account_id>:アカウント識別子
  • <user_name>:ユーザ名(TANI
  • <private_key>:秘密鍵(MIIEvgIB...
  • <table_name>:対象テーブル名(snowpipe_streaming_kafka
  • <db_name>:対象データベース名(TEST
  • <schema_name>:対象スキーマ名(PUBLIC
  • <role_name>:ロール名(ACCOUNTADMIN

テーブルが存在しない場合、Kafkaコネクターが<table_name>で指定されたテーブルを自動的に作成するため、事前に作成しておく必要はありません。

<user_name>, <account_id>, <role_name>に必要な情報は、Snowflake メニュー最下部から「アカウント」→「アカウントの詳細を表示する」で確認可能です。

image.png

image-1.png

<private_key>には「1. 認証設定」で作成した秘密鍵を以下のように1行で記入します。

snowflake.private.key=MIIEvgIB...

上記設定後の kafka-cli フォルダ内部は以下の状態です。

image-2.png

3. Kafkaコネクタ起動

起動前に忘れずに環境変数の設定をしておきます。(セッションをまたいで使用できるよう、.bashrc に追記しておくと便利です)

$ export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
$ export KAFKA_INSTALL_HOME=[Kafka をインストールしたディレクトリ(例:/home/tani/kafka_2.13-3.9.0)]

以下のコマンドでKafkaコネクタを起動します。

$ $KAFKA_INSTALL_HOME/bin/connect-standalone.sh connect-standalone.properties SF_connect.properties

別のターミナルを開き、以下のように Producer を起動してメッセージを送信します。

$ $KAFKA_INSTALL_HOME/bin/kafka-console-producer.sh --topic test-topic --broker-list eh-snowflake-je.servicebus.windows.net:9093 --producer.config client_common.properties

>aaa
>bbb
>ccc

image-3.png

上記の実行後に Snowflake を確認すると、テーブルが作成されデータが取り込まれていることが確認できます。

image-4.png

参考

この記事をシェアする

  • Facebook
  • X
  • Pocket
  • Line
  • Hatena
  • Linkedin

資料請求・お問い合わせはこちら

ページトップへ戻る