Попытка заставить AIOKafka работать с самозаверяющим сертификатом (Python)

Я уже день бьюсь головой о клавиатуру, поэтому сдаюсь и прошу помощи.

У меня есть работающий потребитель, использующий confluence kafka, но мне нужно заставить его работать как сопрограмму, чтобы я мог работать с FastAPI. Я действительно хотел попробовать AIOKafka для этого, но я не могу заставить его работать с самозаверяющим сертификатом (это в нашей среде разработки).

Вот рабочая конфигурация для моего потребителя kafka confluence:

      conf = {
    "bootstrap.servers": "10.142.252.214:9093",
    "group.id": "myConsumerID",
    "security.protocol": "SASL_SSL",
    "sasl.username": kafkaUser,
    "sasl.password": kafkaPass,
    "sasl.mechanisms": "PLAIN",
    "enable.ssl.certificate.verification": "False",
    "on_commit": commit_completed,
    "heartbeat.interval.ms": "1000",
    "socket.connection.setup.timeout.ms": "10000",
    "auto.offset.reset": "earliest"
}

Вот код, который я пытаюсь использовать для AIOKafka

      async def consume():
    cert = "../foo/cert/certificate.pem"
    key = "../foo/cert/key.pem"

    context2 = ssl.create_default_context()
    context2.load_cert_chain(certfile=cert, keyfile=key)
    context2.check_hostname = False
    context2.verify_mode = CERT_NONE
    #context2.ssl_cafile="../foo/cert/CARoot.pem"
    context2.ssl_certfile = "cert.pem"
    context2.ssl_keyfile = "key.pem"
    context2.ssl_password = kafkaKey
    context2.ssl_keystore_type = "PEM"

    consumer = AIOKafkaConsumer(
        'TopicA', 'TopicB',
        bootstrap_servers="10.142.252.214:9093",
        group_id="myConsumerGroup",
        sasl_plain_username="kafkaUser",
        sasl_plain_password="kafkaPass",
        sasl_mechanism="PLAIN",
        security_protocol="SASL_SSL",
        ssl_context=context2)

    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

Когда я пытаюсь запустить это, я просто получаю самые загадочные ошибки, и я не могу понять, с чего начать, пытаясь понять, что не так.

      $ python test-main.py
Traceback (most recent call last):
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/site-packages/aiokafka/conn.py", line 375, in _on_read_task_error
    read_task.result()
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/site-packages/aiokafka/conn.py", line 518, in _read
    resp = await reader.readexactly(4)
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/asyncio/streams.py", line 706, in readexactly
    raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 4 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/myUser/scripts/ansible-hello-world/test-main.py", line 165, in <module>
    asyncio.run(consume())
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/Users/myUser/scripts/ansible-hello-world/test-main.py", line 155, in consume
    await consumer.start()
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 346, in start
    await self._client.bootstrap()
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/site-packages/aiokafka/client.py", line 210, in bootstrap
    bootstrap_conn = await create_conn(
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/site-packages/aiokafka/conn.py", line 96, in create_conn
    await conn.connect()
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/site-packages/aiokafka/conn.py", line 234, in connect
    await self._do_sasl_handshake()
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/site-packages/aiokafka/conn.py", line 314, in _do_sasl_handshake
    auth_bytes = await self._send_sasl_token(
  File "/Users/myUser/.pyenv/versions/3.10.5/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
kafka.errors.KafkaConnectionError: KafkaConnectionError: Connection at 10.142.252.214:9093 closed
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x107338670>

0 ответов

Другие вопросы по тегам