Описание тега aiokafka

Клиент Apache Kafka для asyncio
1 ответ

Какое разделение использовать при выполнении поиска смещения с помощью AIOKafkaConsumer?

При попытке позволить AIOKafkaConsumer начать читать сообщения с определенного смещения starting_offset, как узнать, какой раздел использовать? Я пытаюсь использовать AIOKafkaConsumer.seek , но для этого требуется указать TopicPartition в. import as…
0 ответов

производитель aiokafka, работающий на одном ядре в контейнере python:buster Docker, обеспечивает очень низкую пропускную способность 1 МиБ в секунду

У меня есть простой код для проверки производительности библиотеки. Я использую компьютер с Windows, на котором запущен Docker для Windows и виртуальная машина с 8 ядрами. В этой ситуации библиотека, похоже, обеспечивает очень низкую пропускную спос…
06 дек '21 в 15:03
0 ответов

Как преобразовать операцию ThreadPoolExecutor в asyncio — Python

У меня есть код, который работает с использованием ThreadPoolExecutor и работает параллельно с использованием рабочих потоков. from aiokafka import AIOKafkaProducer producer = AIOKafkaProducer(....) def consume_messages_from_sqs(sqs): with ThreadPoo…
1 ответ

Подключите Python к MSK с аутентификацией на основе ролей IAM.

Я написал скрипт Python с aiokafka для создания и потребления из кластера Kafka в AWS MSK. Я запускаю скрипт из экземпляра EC2, который находится в том же VPC, что и мой кластер, и когда я пытаюсь подключить свой скрипт к кластер отказывается приним…
0 ответов

Айокафка не подключается молча

Я пытаюсь подключить потребителя aiokafka со следующей конфигурацией. kafka_config = { "bootstrap_servers": "b-2.cpkafkacluster-2106.vi24p2.c1.kafka.eu-central-1.amazonaws.com:9096", "security_protocol": "SASL_SSL", "sasl_mechanism": "SCRAM-SHA-512"…
24 авг '22 в 12:00
0 ответов

Получение RuntimeError: цикл событий закрыт, а будущее принадлежит другому циклу, отличному от указанного в качестве аргумента цикла — asyncio + Python

Я пытаюсь заархивировать минимальные асинхронные параллельные задачи, загружая некоторое содержимое и помещая его в очередь Kafka. Но все работает так, как ожидалось, за исключением того, что выдает ошибку и выходит при первом нажатии. Ошибка: venv\…
0 ответов

python fastapi, проблема голодания айокафки

Проблема: потребитель aiokafka недополучает конечную точку fastapi, из-за чего наши проверки работоспособности kubernetes дают сбой, а время ожидания любого другого сервиса, вызывающего открытые конечные точки, истекает. Подробности : Существует пот…
0 ответов

Aiokafka обрабатывает события потребителей параллельно

Я только начинаю работать с kafka, у меня есть кластер k8s, в котором я хочу развернуть прослушиватели событий. Когда у меня работает один слушатель, все работает нормально, но с несколькими подами они обрабатывают события параллельно, хотелось бы, …
24 янв '23 в 21:14
0 ответов

Попытка заставить AIOKafka работать с самозаверяющим сертификатом (Python)

Я уже день бьюсь головой о клавиатуру, поэтому сдаюсь и прошу помощи. У меня есть работающий потребитель, использующий confluence kafka, но мне нужно заставить его работать как сопрограмму, чтобы я мог работать с FastAPI. Я действительно хотел попро…
08 июл '22 в 14:39
0 ответов

python aiokafka многие потребители многим производителям

Я использую aiokafka для потребления, фильтрации полей сообщений и отправки сообщений обратно в kafka. Я запускаю 4 асинхронных потребителя, которые помещают сообщения в асинхронную очередь. Затем один процесс использует эту очередь и создает асинхр…
03 мар '23 в 17:33
0 ответов

Потребитель Aiokafka застрял в бесконечном цикле

Привет, я использую потребителя AioKafka для чтения сообщения, опубликованного другим процессом. Другой процесс только что опубликовал одно сообщение, и мой потребительский код бесконечно читает одно и то же сообщение. Я пытался использовать ручную …
14 ноя '22 в 06:06
1 ответ

aiokafka завершает работу при работе в многопроцессорном классе

Сегодня я бился головой о стену, пытаясь понять, почему это не работает. Я создал этот класс многопроцессорности: class Consumer(multiprocessing.Process): def __init__(self, topic, **kwargs): self.topic = topic super(Consumer, self).__init__(**kwarg…
0 ответов

Ошибка «Не удалось выполнить автоматическую фиксацию смещения: [Ошибка 25] UnknownMemberIdError:» после нескольких дней использования.

Эта проблема всегда возникает через несколько дней после того, как микросервисы общаются с Kafka, у меня 3 узла, и для каждого микросервиса я использую идентификатор группы по определенной теме. Ошибка заключается в следующем. Unable connect to node…
03 апр '23 в 08:04
1 ответ

невозможно получить асинхронное сообщение от производителя и потребителя

Кафка, смотритель зоопарка, работает успешно Это мой продюсер.py async def publish(): producer = AIOKafkaProducer(bootstrap_servers='localhost:9092', enable_idempotence=True) await producer.start() consumer = AIOKafkaConsumer( topicAKG, bootstrap_se…
09 май '23 в 11:55
0 ответов

Потребительские записи Kafka — обработка

В Kafka я использую getmany для чтения сообщений потребителей. Всего из 650 сообщений (обработка которых займет около 3 дней) обработка происходит примерно для 100-150 записей (иногда 12 часов, а иногда и 24 часов), после чего дальнейшая обработка н…
0 ответов

Как правильно управлять FastAPI WebSockets с помощью AIOKafka Consumer

У меня есть простая конечная точка веб-сокета в FastAPI, которая получает данные с сервера Kafka с пакетом AIOKafka и отправляет их через веб-сокет. @router.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: i…
14 мар '23 в 16:48
0 ответов

Python aiokafka – как установить максимальный лимит повторов для каждого сообщения?

Я использую aiokafka 0.8.0 в Python 3.9 для изучения темы Kafka. Иногда обработка завершается сбоем, что обычно рассматривается как исключение Retryable, поэтому вскоре после этого сообщение отправляется снова. Однако в некоторых случаях исключение …
27 май '23 в 05:59