Описание тега aiokafka
Клиент Apache Kafka для asyncio
1
ответ
Какое разделение использовать при выполнении поиска смещения с помощью AIOKafkaConsumer?
При попытке позволить AIOKafkaConsumer начать читать сообщения с определенного смещения starting_offset, как узнать, какой раздел использовать? Я пытаюсь использовать AIOKafkaConsumer.seek , но для этого требуется указать TopicPartition в. import as…
05 авг '21 в 08:04
0
ответов
производитель aiokafka, работающий на одном ядре в контейнере python:buster Docker, обеспечивает очень низкую пропускную способность 1 МиБ в секунду
У меня есть простой код для проверки производительности библиотеки. Я использую компьютер с Windows, на котором запущен Docker для Windows и виртуальная машина с 8 ядрами. В этой ситуации библиотека, похоже, обеспечивает очень низкую пропускную спос…
06 дек '21 в 15:03
0
ответов
Как преобразовать операцию ThreadPoolExecutor в asyncio — Python
У меня есть код, который работает с использованием ThreadPoolExecutor и работает параллельно с использованием рабочих потоков. from aiokafka import AIOKafkaProducer producer = AIOKafkaProducer(....) def consume_messages_from_sqs(sqs): with ThreadPoo…
14 июл '22 в 09:07
1
ответ
Подключите Python к MSK с аутентификацией на основе ролей IAM.
Я написал скрипт Python с aiokafka для создания и потребления из кластера Kafka в AWS MSK. Я запускаю скрипт из экземпляра EC2, который находится в том же VPC, что и мой кластер, и когда я пытаюсь подключить свой скрипт к кластер отказывается приним…
05 июн '22 в 14:58
0
ответов
Айокафка не подключается молча
Я пытаюсь подключить потребителя aiokafka со следующей конфигурацией. kafka_config = { "bootstrap_servers": "b-2.cpkafkacluster-2106.vi24p2.c1.kafka.eu-central-1.amazonaws.com:9096", "security_protocol": "SASL_SSL", "sasl_mechanism": "SCRAM-SHA-512"…
24 авг '22 в 12:00
0
ответов
Получение RuntimeError: цикл событий закрыт, а будущее принадлежит другому циклу, отличному от указанного в качестве аргумента цикла — asyncio + Python
Я пытаюсь заархивировать минимальные асинхронные параллельные задачи, загружая некоторое содержимое и помещая его в очередь Kafka. Но все работает так, как ожидалось, за исключением того, что выдает ошибку и выходит при первом нажатии. Ошибка: venv\…
15 июл '22 в 13:01
0
ответов
python fastapi, проблема голодания айокафки
Проблема: потребитель aiokafka недополучает конечную точку fastapi, из-за чего наши проверки работоспособности kubernetes дают сбой, а время ожидания любого другого сервиса, вызывающего открытые конечные точки, истекает. Подробности : Существует пот…
11 окт '22 в 08:09
0
ответов
Aiokafka обрабатывает события потребителей параллельно
Я только начинаю работать с kafka, у меня есть кластер k8s, в котором я хочу развернуть прослушиватели событий. Когда у меня работает один слушатель, все работает нормально, но с несколькими подами они обрабатывают события параллельно, хотелось бы, …
24 янв '23 в 21:14
0
ответов
Попытка заставить AIOKafka работать с самозаверяющим сертификатом (Python)
Я уже день бьюсь головой о клавиатуру, поэтому сдаюсь и прошу помощи. У меня есть работающий потребитель, использующий confluence kafka, но мне нужно заставить его работать как сопрограмму, чтобы я мог работать с FastAPI. Я действительно хотел попро…
08 июл '22 в 14:39
0
ответов
python aiokafka многие потребители многим производителям
Я использую aiokafka для потребления, фильтрации полей сообщений и отправки сообщений обратно в kafka. Я запускаю 4 асинхронных потребителя, которые помещают сообщения в асинхронную очередь. Затем один процесс использует эту очередь и создает асинхр…
03 мар '23 в 17:33
0
ответов
Потребитель Aiokafka застрял в бесконечном цикле
Привет, я использую потребителя AioKafka для чтения сообщения, опубликованного другим процессом. Другой процесс только что опубликовал одно сообщение, и мой потребительский код бесконечно читает одно и то же сообщение. Я пытался использовать ручную …
14 ноя '22 в 06:06
1
ответ
aiokafka завершает работу при работе в многопроцессорном классе
Сегодня я бился головой о стену, пытаясь понять, почему это не работает. Я создал этот класс многопроцессорности: class Consumer(multiprocessing.Process): def __init__(self, topic, **kwargs): self.topic = topic super(Consumer, self).__init__(**kwarg…
02 май '23 в 21:15
0
ответов
Ошибка «Не удалось выполнить автоматическую фиксацию смещения: [Ошибка 25] UnknownMemberIdError:» после нескольких дней использования.
Эта проблема всегда возникает через несколько дней после того, как микросервисы общаются с Kafka, у меня 3 узла, и для каждого микросервиса я использую идентификатор группы по определенной теме. Ошибка заключается в следующем. Unable connect to node…
03 апр '23 в 08:04
1
ответ
невозможно получить асинхронное сообщение от производителя и потребителя
Кафка, смотритель зоопарка, работает успешно Это мой продюсер.py async def publish(): producer = AIOKafkaProducer(bootstrap_servers='localhost:9092', enable_idempotence=True) await producer.start() consumer = AIOKafkaConsumer( topicAKG, bootstrap_se…
09 май '23 в 11:55
0
ответов
Потребительские записи Kafka — обработка
В Kafka я использую getmany для чтения сообщений потребителей. Всего из 650 сообщений (обработка которых займет около 3 дней) обработка происходит примерно для 100-150 записей (иногда 12 часов, а иногда и 24 часов), после чего дальнейшая обработка н…
17 апр '23 в 02:37
0
ответов
Как правильно управлять FastAPI WebSockets с помощью AIOKafka Consumer
У меня есть простая конечная точка веб-сокета в FastAPI, которая получает данные с сервера Kafka с пакетом AIOKafka и отправляет их через веб-сокет. @router.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: i…
14 мар '23 в 16:48
0
ответов
Python aiokafka – как установить максимальный лимит повторов для каждого сообщения?
Я использую aiokafka 0.8.0 в Python 3.9 для изучения темы Kafka. Иногда обработка завершается сбоем, что обычно рассматривается как исключение Retryable, поэтому вскоре после этого сообщение отправляется снова. Однако в некоторых случаях исключение …
27 май '23 в 05:59