Подключение к Confluent Cloud с помощью клиента AIOKafka

Я пытаюсь подключиться к своему Confluent Cloud Кластер Kafka с использованием модифицированной версии AIOKafka ssl_consume_produce.py пример в AIOKafkaрепо на https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.py. Я настроил свойAIOKafkaConsumer а также AIOKafkaProducer с тем, что я считаю правильными параметрами, но получаю следующую ошибку времени выполнения:

/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
Traceback (most recent call last):
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 57, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 52, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 23, in produce_and_consume
    await producer.start()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
    await self.client.bootstrap()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
    version_hint=version_hint)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
    await conn.connect()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
    await self._do_sasl_handshake()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
    payload, expect_response = res
RuntimeError: await wasn't used with future
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f9bc818d350>

Моя адаптированная версия кода:

import asyncio
from ssl import create_default_context, Purpose
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from kafka.common import TopicPartition

import ccloud_lib

ssl_context = create_default_context(Purpose.SERVER_AUTH, cafile='cacert.pem')
conf = ccloud_lib.read_ccloud_config('kafka_config.conf')

async def produce_and_consume(loop):
    # Produce
    producer = AIOKafkaProducer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop = loop,
        security_protocol='SASL_SSL',
        ssl_context=ssl_context,
        sasl_mechanism='PLAIN',
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )

    await producer.start()
    try:
        msg = await producer.send_and_wait(
            'my_topic', b"Super Message", partition=0)
    finally:
        await producer.stop()

    consumer = AIOKafkaConsumer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop=loop,
        ssl_context=ssl_context,
        security_protocol='SASL_SSL',
        sasl_mechanism='PLAIN',
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )
    await consumer.start()
    try:
        consumer.seek(TopicPartition('my_topic', 0), msg.offset)
        fetch_msg = await consumer.getone()
    finally:
        await consumer.stop()

    print("Success", msg, fetch_msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    task = loop.create_task(produce_and_consume(loop))
    try:
        loop.run_until_complete(task)
    finally:
        loop.run_until_complete(asyncio.sleep(0, loop=loop))
        task.cancel()
        try:
            loop.run_until_complete(task)
        except asyncio.CancelledError:
            pass

Моя запутанная конфигурация conf выглядит так:

bootstrap.servers=*****.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="********************" password\="****************************************";
sasl.username=********************
sasl.password=********************************************************************************
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=********************:********************
schema.registry.url=https://********************.us-central1.gcp.confluent.cloud

Можно ли подключиться к Confluent Cloudс помощью клиента AIOKafka? Что-то в моей конфигурации неверно?

0 ответов

Другие вопросы по тегам