Подключите Python к MSK с аутентификацией на основе ролей IAM.

Я написал скрипт Python с aiokafka для создания и потребления из кластера Kafka в AWS MSK. Я запускаю скрипт из экземпляра EC2, который находится в том же VPC, что и мой кластер, и когда я пытаюсь подключить свой скрипт к кластер отказывается принимать соединение:

Сценарий

      from aiokafka import AIOKafkaConsumer
import asyncio
import os
import sys


async def consume():
    bootstrap_server = os.environ.get('BOOTSTRAP_SERVER', 'localhost:9092')
    topic = os.environ.get('TOPIC', 'demo')
    group = os.environ.get('GROUP_ID', 'demo-group')
    consumer = AIOKafkaConsumer(
        topic, bootstrap_servers=bootstrap_server, group_id=group
    )

    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()


def main():
    try:
        asyncio.run(consume())
    except KeyboardInterrupt:
        print("Bye!")
        sys.exit(0)


if __name__ == "__main__":
    print("Welcome to Kafka test script. ctrl + c to exit")
    main()

Исключение

      Unable to request metadata from "boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098": KafkaConnectionError: Connection at boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 closed
Traceback (most recent call last):
  File "producer.py", line 33, in <module>
    main()
  File "producer.py", line 25, in main
    asyncio.run(produce_message(message))
  File "/usr/lib64/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/lib64/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "producer.py", line 12, in produce_message
    await producer.start()
  File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/producer/producer.py", line 296, in start
    await self.client.bootstrap()
  File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/client.py", line 250, in bootstrap
    f'Unable to bootstrap from {self.hosts}')
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('boot-zm5x2eaw.c3.kafka-serverless.us-east-1.amazonaws.com', 9098, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f76d123a510>

Я уже протестировал соединение со сценариями оболочки kafka, и оно сработало нормально:

      ./kafka-console-producer.sh --bootstrap-server boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 --producer.config client.properties  --topic myTopic

Но всякий раз, когда я пытаюсь использовать python, он просто не работает, я немного исследовал и обнаружил, что это может быть протокол аутентификации, мой кластер KMS защищен с помощью аутентификации на основе ролей IAM, но сколько бы я ни искал, нет документация по аутентификации с помощью IAM в библиотеках python kafka: aiokafka, python-kafka, faust и т. д.

Есть ли у кого-нибудь пример успешного подключения к бессерверному кластеру KMS с аутентификацией на основе ролей IAM с использованием Python?

1 ответ

Хотя это невозможно сделать с помощью существующих пакетов Python, вот как мне удалось это сделать, обернув клиентские сценарии Kafka внутри подпроцесса Python.

      # producer
def create_cli_producer(arguments):
    print(f"Initializing kafka producer for servers: {arguments.kafka_servers}")
    print(f"topic: {arguments.pub_topic}")

    kafka_producer_init_cmd = [
        f"{arguments.kafka_path}/bin/kafka-console-producer.sh",
        "--topic", arguments.pub_topic,
        "--bootstrap-server", arguments.kafka_servers
    ]

    if arguments.configs:
        kafka_producer_init_cmd = kafka_producer_init_cmd + ["--producer.config", arguments.configs]

    try:
        proc = subp.Popen(kafka_producer_init_cmd, stdin=subp.PIPE)
        print("kafka producer init done.")
        return proc
    except Exception as e:
        print(f"Error creating producer: {e}")
        return None

# consumer.py
def consume_messages(consumer, producer):
    print('Listening for new messages...')
    try:
        for line in consumer.stdout:
            rcvd_msg = line.decode().strip()
            print(f"Received: {rcvd_msg}")

            send_msg_thread = threading.Thread(target=send_message, args=(producer, rcvd_msg))
            send_msg_thread.daemon = True
            send_msg_thread.start()
    except KeyboardInterrupt:
        # If the user interrupts the program (e.g., by pressing Ctrl+C),
        # terminate the subprocess gracefully
        consumer.terminate()
        consumer.wait()

    finally:
        # Capture and print any error messages from the consumer's standard error stream
        for error_line in consumer.stderr:
            print("Error:", error_line.decode().strip())


def send_message(producer, msg):
    # Publish the received message to the producer
    try:
        print(f"Publishing message: {msg}")
        producer.stdin.write(msg.encode() + b"\n")
        producer.stdin.flush()
    except Exception as e:
        print(f"Error sending message: {e}")

Получите полный код здесь https://github.com/maxcotec/aws-IAM-auth-msk-python .

Посмотреть демо можно здесь: https://youtu.be/xL5wFLF4nrk

Другие вопросы по тегам