Подключите Python к MSK с аутентификацией на основе ролей IAM.
Я написал скрипт Python с aiokafka для создания и потребления из кластера Kafka в AWS MSK. Я запускаю скрипт из экземпляра EC2, который находится в том же VPC, что и мой кластер, и когда я пытаюсь подключить свой скрипт к кластер отказывается принимать соединение:
Сценарий
from aiokafka import AIOKafkaConsumer
import asyncio
import os
import sys
async def consume():
bootstrap_server = os.environ.get('BOOTSTRAP_SERVER', 'localhost:9092')
topic = os.environ.get('TOPIC', 'demo')
group = os.environ.get('GROUP_ID', 'demo-group')
consumer = AIOKafkaConsumer(
topic, bootstrap_servers=bootstrap_server, group_id=group
)
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
def main():
try:
asyncio.run(consume())
except KeyboardInterrupt:
print("Bye!")
sys.exit(0)
if __name__ == "__main__":
print("Welcome to Kafka test script. ctrl + c to exit")
main()
Исключение
Unable to request metadata from "boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098": KafkaConnectionError: Connection at boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 closed
Traceback (most recent call last):
File "producer.py", line 33, in <module>
main()
File "producer.py", line 25, in main
asyncio.run(produce_message(message))
File "/usr/lib64/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/lib64/python3.7/asyncio/base_events.py", line 587, in run_until_complete
return future.result()
File "producer.py", line 12, in produce_message
await producer.start()
File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/producer/producer.py", line 296, in start
await self.client.bootstrap()
File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/client.py", line 250, in bootstrap
f'Unable to bootstrap from {self.hosts}')
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('boot-zm5x2eaw.c3.kafka-serverless.us-east-1.amazonaws.com', 9098, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f76d123a510>
Я уже протестировал соединение со сценариями оболочки kafka, и оно сработало нормально:
./kafka-console-producer.sh --bootstrap-server boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 --producer.config client.properties --topic myTopic
Но всякий раз, когда я пытаюсь использовать python, он просто не работает, я немного исследовал и обнаружил, что это может быть протокол аутентификации, мой кластер KMS защищен с помощью аутентификации на основе ролей IAM, но сколько бы я ни искал, нет документация по аутентификации с помощью IAM в библиотеках python kafka: aiokafka, python-kafka, faust и т. д.
Есть ли у кого-нибудь пример успешного подключения к бессерверному кластеру KMS с аутентификацией на основе ролей IAM с использованием Python?
1 ответ
Хотя это невозможно сделать с помощью существующих пакетов Python, вот как мне удалось это сделать, обернув клиентские сценарии Kafka внутри подпроцесса Python.
# producer
def create_cli_producer(arguments):
print(f"Initializing kafka producer for servers: {arguments.kafka_servers}")
print(f"topic: {arguments.pub_topic}")
kafka_producer_init_cmd = [
f"{arguments.kafka_path}/bin/kafka-console-producer.sh",
"--topic", arguments.pub_topic,
"--bootstrap-server", arguments.kafka_servers
]
if arguments.configs:
kafka_producer_init_cmd = kafka_producer_init_cmd + ["--producer.config", arguments.configs]
try:
proc = subp.Popen(kafka_producer_init_cmd, stdin=subp.PIPE)
print("kafka producer init done.")
return proc
except Exception as e:
print(f"Error creating producer: {e}")
return None
# consumer.py
def consume_messages(consumer, producer):
print('Listening for new messages...')
try:
for line in consumer.stdout:
rcvd_msg = line.decode().strip()
print(f"Received: {rcvd_msg}")
send_msg_thread = threading.Thread(target=send_message, args=(producer, rcvd_msg))
send_msg_thread.daemon = True
send_msg_thread.start()
except KeyboardInterrupt:
# If the user interrupts the program (e.g., by pressing Ctrl+C),
# terminate the subprocess gracefully
consumer.terminate()
consumer.wait()
finally:
# Capture and print any error messages from the consumer's standard error stream
for error_line in consumer.stderr:
print("Error:", error_line.decode().strip())
def send_message(producer, msg):
# Publish the received message to the producer
try:
print(f"Publishing message: {msg}")
producer.stdin.write(msg.encode() + b"\n")
producer.stdin.flush()
except Exception as e:
print(f"Error sending message: {e}")
Получите полный код здесь https://github.com/maxcotec/aws-IAM-auth-msk-python .
Посмотреть демо можно здесь: https://youtu.be/xL5wFLF4nrk