レック・テクノロジー・コンサルティング株式会社TECH BLOG

Azure Event Hubs のセットアップ方法&ストリーミング基本操作

Apache Kafka を利用可能なマネージド型のストリーミングサービスである Azure Event Hubs のセットアップ方法とストリーミングの基本的な操作についてご紹介します。

セットアップ手順

1. Event Hubs 名前空間の作成

ホームから Event Hubs に移動します。

image.png

「Event Hubs 名前空間の作成」をクリックします。

image-1.png

基本タブで以下の項目を設定します。

  • サブスクリプション:対象のサブスクリプション
  • リソースグループ:対象のリソースグループ
  • 名前空間の名前:任意の名前
  • 場所:指定のリージョン
  • 価格レベル:標準(Basic では Kafka がサポートされないため)
  • スループットユニット:1
  • 自動インフレを有効にする:なし

※スループットユニットはデータ処理能力を表す指標です。今回は検証目的のため、課金を抑えた設定にしています。

その他はデフォルトのままとします。

image-2.png

「確認と作成」タブに移り、「作成」をクリックします。

image-3.png

以上で Event Hubs の作成は完了です。

後ほど利用するため、Event Hubs 名前空間の「設定」→「共有アクセスポリシー」→「RootManageSharedAccessKey」から Primary connection string を控えておきます。

image-4.png

2. JRE インストール

Java 製の Kafka を実行できるよう、JRE をインストールしておきます。

$ sudo apt update
$ sudo apt install default-jre

3. Kafka インストール

Kafka のインストールを行います。

※以降の処理はローカル環境のWSL(Ubuntu 22.04.3 LTS)上で実施していますが、VM を作成して実施しても問題ありません。

はじめに Kafka をインストールします。以下のコマンドを実行します。

$ curl -OL https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
$ tar -xzf kafka_2.13-3.9.0.tgz

これで Kafka のインストールは完了です。

4. Kafka CLI 導入

Event Hubs に対してメッセージを送受信できるようにするため、以下のコマンドを実行し Kafka CLI を導入します。

$ git clone https://github.com/Azure/azure-event-hubs-for-kafka.git

下記コマンドで kafka-cli ディレクトリに移動します。

$ cd azure-event-hubs-for-kafka/quickstart/kafka-cli

jaas.conf に先ほど取得した「Primary connection string」を書き込んで上書き保存します。

KafkaClient { 
    org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<Primary connection string>";
};

5. 動作確認

まず、以下のように環境変数を設定しておきます。

$ export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
$ export KAFKA_INSTALL_HOME=[Kafka をインストールしたディレクトリ(例:/home/tani/kafka_2.13-3.9.0)]

Producer 接続

Producer を起動し、メッセージを送信してみます。

$ $KAFKA_INSTALL_HOME/bin/kafka-console-producer.sh --topic test-topic --broker-list <名前空間名>.servicebus.windows.net:9093 --producer.config client_common.properties

> aaa
> bbb
> ccc

トピックは事前に作成していなくても上記コマンドを実行した時点で自動的に作成されます。

Consumer 起動

別ターミナルを開き、環境変数を設定したのち Consumer を起動します。

$ $KAFKA_INSTALL_HOME/bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server <名前空間名>.servicebus.windows.net:9093 --consumer.config client_common.properties

aaa
bbb
ccc

上記により、メッセージを送受信できていることが確認できます。

image-5.png

Event Hubs 名前空間の概要でも送受信の記録を確認することが可能です。

追記

上記ではコマンドによるメッセージ送受信の例をご紹介しましたが、confluent-kafka-python等のライブラリを利用してプログラムからメッセージ送受信を行うことも可能です。

以下は「Primary connection string」を使って認証を行い、AdminClientを利用してトピックの一覧を表示するPythonプログラムの例です。

from confluent_kafka.admin import AdminClient

def main():
    conf = {
        'bootstrap.servers': '<名前空間名>:9093',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': '<Primary connection string>'
    }
    ac = AdminClient(conf)

    cluster_metadata = ac.list_topics()
    print(f"topics: {cluster_metadata.topics}")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"Error: {e}")

# 実行結果
# topics: {'test-topic': TopicMetadata(test-topic, 1 partitions)}

参考

この記事をシェアする

  • Facebook
  • X
  • Pocket
  • Line
  • Hatena
  • Linkedin

資料請求・お問い合わせはこちら

ページトップへ戻る