システム間の大量のデータ連携やストリーミング処理で活用されるKafkaのマネージドサービスであるAWS MSKについて、セットアップからメッセージ送受信までの流れを具体的に見ていきます。
はじめに:Kafkaとは
MSKについて説明する前に、MSKの本体であるApache Kafkaについて簡単にご説明します。
Kafkaはストリーミングデータのリアルタイム処理に特化した分散データストアで、複数の送信側のシステムと受信側のシステムの間で非同期にデータを仲介する役割を果たします。私たちの日常で例えれば、好きなタイミングで手紙をポストに投函すれば的確に送り先に届けてくれる、郵便局のような存在ですね。
ユースケース
Kafkaのユースケースは以下の2つです。
- 複数システム間を繋ぐデータハブの構築
- ストリーミングアプリケーションの構築
Kafkaはクラスタを利用した分散処理機構と冗長構成によって高いスループットと可用性を兼ね備えています。そのため、複数のシステム間で大量のデータを受け渡しするためのハブとして活用されたり、IoTセンサー等から送られた大量のデータを処理するストリーミングアプリケーションの構築に活用されたりしています。
構成要素
Broker, ZooKeeper
KafkaはBrokerクラスタとZooKeeperクラスタで構成されています。Brokerクラスタは各Brokerに分散配置されたPartitionという単位でメッセージの管理を行い、ZooKeeperクラスタが各Broker間の連携を取ります。
Topic, Partition
Topicはメッセージの送受信で利用される論理的なチャンネルです。ニュースアプリで例えると、「スポーツ」、「政治経済」、「テクノロジー」などのカテゴリを表すものです。Topicは複数のPartitionで構成されます。
Partitionは名前の通り「区切り」を意味するもので、Topic内のメッセージを分けて格納するための入れ物です。
Publisher, Subscriber
冒頭の例で、Kafkaは郵便局のような存在だと書きました。Kafkaを郵便局としたとき、手紙を送る側がPublisher、受け取る側がSubscriberとなります。KafkaのTopicに対して送信側システム(Publisher)がメッセージを送り、受信側システム(Subscriber)が対象のTopicからメッセージを受け取ります。
KafkaのTopicに対してメッセージを送受信するためにPublisher側ではProducerと呼ばれる書き込み用ライブラリを利用し、Subscriber側ではConsumerと呼ばれる読み出し用ライブラリを利用します。
AWS MSKとは
AWSのApache Kafkaのマネージドサービスで、正式名称は「Managed Streaming for Apache Kafka」です。
自前でKafkaを用意しようとすると一からサーバを立てて、設定をして、メンテナンスをして...と管理が少し大変ですが、MSKはKafkaの実行基盤をすべてAWS側が担当し、デフォルトで以下のような機能を提供してくれます。
- 暗号化通信
- メトリクス監視
- 自動スケーリング
- セキュリティパッチ適用
ユーザはコンソールの手順に沿ってマシンのスペックやネットワーク、セキュリティ等の最低限の設定を行うだけで、数ステップでKafkaの実行環境を構築することができます。
MSKのセットアップ
MSKの概要について把握できたところで、セットアップ手順について詳しく見ていきましょう。
「Amazon MSK」→「クラスター」→「クラスターの作成」と進み、各種設定を行っていきます。
クラスターの設定
クラスターの作成方法は「クイック作成」と「カスタム作成」の2種類ありますが、今回はカスタム作成とします。クラスター名は適当なものを設定。
クラスタータイプはブローカーの数等を手動で設定できる「プロビジョンド」で進めます。Apache Kafkaバージョンは特にこだわりがない場合、デフォルトで選択されている推奨バージョンにしておきましょう。
今回は検証目的なので、ブローカーサイズはkafka.t3.small、ゾーン数は2、ゾーン当たりのブローカーは1とします。MSKはブローカーインスタンスのサイズ、アクティブなブローカー数、プロビジョニングしたストレージ量に基づいて課金されるので、料金に注意しながらご利用のユースケースに合わせて適宜変更してください。
ストレージと構成はデフォルトのまま進めます。
ネットワーク
MSKクラスタを作成するVPCとゾーンを設定します。
セキュリティグループは、ご利用の環境に合わせて適切なものを設定します。今回の場合、同一VPC内からのすべてのアクセスを許可するセキュリティグループを設定しています。
セキュリティ
デフォルトのままで進めます。
モニタリング
デフォルトのままで進めます。
各種設定値を確認後、問題がなければ「クラスターの作成」をクリックします。クラスタの作成には少し時間がかかるので、お茶でもしながら待ちましょう。以上でMSKのセットアップは完了です。
MSKにアクセス
環境準備
PythonプログラムからMSKクラスタにアクセスするにあたり、MSKクラスタと同じVPC内にEC2を用意しました。環境は以下の通りです。
- OS: Amazon Linux 2023
- Python: 3.12.0
GCP等ほかの環境からアクセスする場合でも、対象のMSKクラスタのブートストラップサーバのエンドポイントへの接続ができていれば問題ありません。
ロール作成
プログラムからMSKクラスタにアクセスし各種操作を行うためには、適切な権限をもった認証情報が必要です。認証情報の取得方法として権限を持ったアカウントのアクセスキーとシークレットキーを使う方法やロールから一時認証情報を取得する方法がありますが、今回はロールから一時認証情報を取得する方法で行います。
一時認証情報取得用ロール
まず、以下の許可ポリシーと信頼関係をもつロール"msk-test-role"を作成します。
許可ポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "*"
}
]
}
※任意のリソースに対してAssumeRole(権限の引き受け)を可能にするポリシーです。
信頼関係
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
作成したロールはEC2にアタッチしておきます。
MSKクラスタ操作用ロール
次に、以下の許可ポリシーと信頼関係をもつロール"msk-iam-role"を作成します。
許可ポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:xxxxxxxxxxxx:cluster/sample-msk-cluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:xxxxxxxxxxxx:topic/sample-msk-cluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup",
"kafka-cluster:DeleteGroup"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:xxxxxxxxxxxx:group/sample-msk-cluster/*"
]
}
]
}
※MSKクラスタへの接続と操作、Topicの操作、Groupの操作に関する権限を付与するポリシーです。Resource
部分には、ご利用のMSKクラスタのARNを指定してください。
信頼関係
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::xxxxxxxxxxxx:role/msk-test-role"
},
"Action": "sts:AssumeRole"
}
]
}
※"msk-test-role"からAssumeRoleするため、Principal
に"msk-test-role"を設定します。
プログラム実行
今回のPythonプログラムを実行するために必要なライブラリは以下の通りです。
confluent-kafka
aws-msk-iam-sasl-signer-python
また、環境変数を以下のように設定しておきます。
#!/bin/bash
export BOOTSTRAP_SERVERS=b-1.samplemskcluster.xxxxxx.xx.kafka.ap-northeast-1.amazonaws.com:9098,b-2.samplemskcluster.xxxxxx.xx.kafka.ap-northeast-1.amazonaws.com:9098
export AWS_DEFAULT_REGION=ap-northeast-1
export ROLE_ARN=arn:aws:iam::xxxxxxxxxxxx:role/msk-iam-role
BOOTSTRAP_SERVERS
に指定するのは、MSK管理画面の「クラスターの概要」→「クライアント情報の表示」で「プライベートエンドポイント」欄に記載されているエンドポイントです。
準備ができたら、いよいよプログラムの実行です。以下のプログラムをコピーして、topics.pyという名前で保存してください。
import os
from confluent_kafka.admin import AdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
# 環境変数から設定取得
bootstrap_servers = os.getenv("BOOTSTRAP_SERVERS")
aws_region = os.getenv("AWS_DEFAULT_REGION")
role_arn = os.getenv("ROLE_ARN")
def oauth_cb(oauth_config):
auth_token, _ = MSKAuthTokenProvider.generate_auth_token_from_role_arn(aws_region, role_arn)
return auth_token, _
def get_admin_client():
admin_client = AdminClient({
#"debug": "all",
"bootstrap.servers": bootstrap_servers,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"oauth_cb": oauth_cb
})
return admin_client
def list_topics():
admin_client = get_admin_client()
admin_client.poll(0)
cluster_metadata = admin_client.list_topics()
print(f"topics: {cluster_metadata.topics}")
def create_topics(topic_list):
admin_client = get_admin_client()
admin_client.poll(0)
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topic_list]
fs = admin_client.create_topics(new_topics)
for topic, f in fs.items():
try:
f.result()
print(f"Topic {topic} created.")
except Exception as e:
print(f"Failed to create topic {topic}: {e}")
def delete_topics(topic_list):
admin_client = get_admin_client()
admin_client.poll(0)
fs = admin_client.delete_topics(topic_list)
for topic, f in fs.items():
try:
f.result()
print(f"Topic {topic} deleted.")
except Exception as e:
print(f"Failed to delete topic {topic}: {e}")
def main():
topic_list = ["sample-topic-1"]
try:
create_topics(topic_list)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
上記のプログラムは、"sample-topic-1"という名前のTopicをcreate_topics()
という関数で作成するプログラムです。他にもlist_topics()
やdelete_topics()
という関数を用意しており、それらを使ってTopicの一覧表示や削除もできます。
$ python topics.py
Topic sample-topic-1 created.
"sample-topic-1"が作成されました。
Topicを作成したので、今度はそのTopicに対してメッセージを送信してみます。以下のプログラムをコピーして、producer.pyという名前で保存してください。
import os
from confluent_kafka import Producer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
# 環境変数から設定取得
bootstrap_servers = os.getenv("BOOTSTRAP_SERVERS")
aws_region = os.getenv("AWS_DEFAULT_REGION")
role_arn = os.getenv("ROLE_ARN")
def oauth_cb(oauth_config):
auth_token, _ = MSKAuthTokenProvider.generate_auth_token_from_role_arn(aws_region, role_arn)
return auth_token, _
def get_producer():
producer = Producer({
#"debug": "all",
"bootstrap.servers": bootstrap_servers,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"oauth_cb": oauth_cb
})
return producer
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
def produce_messages(target_topic, message_list):
producer = get_producer()
producer.poll(0)
for message in message_list:
producer.produce(target_topic, message.encode("utf-8"), callback=delivery_report)
producer.flush()
def main():
target_topic = "sample-topic-1"
message_list = ["Hello, MSK!", "I'm Re:Q Taro.", "Nice to meet you."]
try:
produce_messages(target_topic, message_list)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
上記はTopicにメッセージを送信するプログラムです。先ほど作成した"sample-topic-1"に対し、message_list
で指定されたメッセージを送信します。
$ python producer.py
Message delivered to sample-topic-1 [0]
Message delivered to sample-topic-1 [0]
Message delivered to sample-topic-1 [0]
メッセージが"sample-topic-1"に送信されました。
次はTopicに送信されたメッセージを受信してみます。以下のプログラムをコピーして、consumer.pyという名前で保存してください。
import os
from confluent_kafka import Consumer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
# 環境変数から設定取得
bootstrap_servers = os.getenv("BOOTSTRAP_SERVERS")
aws_region = os.getenv("AWS_DEFAULT_REGION")
role_arn = os.getenv("ROLE_ARN")
def oauth_cb(oauth_config):
auth_token, _ = MSKAuthTokenProvider.generate_auth_token_from_role_arn(aws_region, role_arn)
return auth_token, _
def get_consumer():
consumer = Consumer({
#"debug": "all",
"bootstrap.servers": bootstrap_servers,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"oauth_cb": oauth_cb,
"group.id": "mygroup",
"auto.offset.reset": "earliest"
})
return consumer
def consume_messages(target_topics):
consumer = get_consumer()
consumer.subscribe(target_topics)
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
data = msg.value().decode("utf-8")
topic = msg.topic()
print(f"message: {data}, topic: {topic}")
consumer.close()
def main():
target_topics = ["sample-topic-1"]
try:
consume_messages(target_topics)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
上記はTopicからメッセージを受信するプログラムです。"sample-topic-1"のメッセージを受信し出力します。
$ python consumer.py
message: Hello, MSK!, topic: sample-topic-1
message: I'm Re:Q Taro., topic: sample-topic-1
message: Nice to meet you., topic: sample-topic-1
先ほど"sample-topic-1"に送信したメッセージが受信され、出力されました。
さいごに
MSKのセットアップからPythonプログラムによるメッセージ送受信まで順を追って見てきました。MSKは簡単にセットアップできて運用管理が楽なだけではなく、AWSのIAM認証を利用して安全に通信やMSKクラスタの操作を行うことができます。今回ご紹介したようにPythonプログラムからも各種操作を行うことができるので、AWS内外のデータ基盤との連携がしやすいという点もメリットです。