У новых версий производителей Kafka все еще есть "factory.type"?
В старых версиях документа сказано, что это одно из важнейших свойств.
Документ более новой версии не упоминает об этом вообще.
Есть ли у более новых версий производителей Kafka еще producer.type
?
Или новые производители всегда async
и я должен позвонить future.get()
сделать это sync
?
2 ответа
Новые производители всегда асинхронны, и вы должны вызвать future.get() для его синхронизации. Не стоит создавать два метода apis, когда что-то простое, например, добавление future.get(), дает практически одинаковую функциональность.
Из документации для отправки () здесь
Поскольку вызов send является асинхронным, он возвращает Future для RecordMetadata, который будет назначен этой записи. Вызов get () для этого будущего будет блокировать до тех пор, пока соответствующий запрос не завершится, а затем вернет метаданные для записи или сгенерирует любое исключение, возникшее при отправке записи.
Если вы хотите смоделировать простой блокирующий вызов, вы можете немедленно вызвать метод get ():
byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value); producer.send(record).get();
Почему вы хотите сделать send()
синхронизировать?
Это функция kafka для пакетного сообщения для лучшей пропускной способности.
Асинхронная отправка
Пакетирование является одним из главных факторов повышения эффективности, и для обеспечения возможности пакетирования производитель Kafka будет пытаться накапливать данные в памяти и отправлять большие пакеты в одном запросе. Пакетная обработка может быть сконфигурирована так, чтобы накапливать не более фиксированного количества сообщений и ждать не дольше, чем некоторая фиксированная задержка (скажем, 64 КБ или 10 мс). Это позволяет аккумулировать больше байтов для отправки и несколько больших операций ввода-вывода на серверах. Эта буферизация настраивается и дает механизм для замены небольшого дополнительного времени ожидания для лучшей пропускной способности.
Невозможно выполнить синхронизацию отправки, потому что API поддерживает только асинхронный метод, но есть некоторые конфиги, которые вы можете указать для выполнения некоторой работы.
Вы можете установить batch.size равным 0. В этом случае отправка сообщений отключена.
Однако я думаю, что вы должны просто оставить batch.size по умолчанию и установить для linger.ms значение 0 (это также по умолчанию). В этом случае, если много сообщений пришло в одно и то же время, они будут объединены в одну отправку немедленно.
Производитель группирует все записи, которые поступают между передачами запроса, в один пакетный запрос. Обычно это происходит только под нагрузкой, когда записи поступают быстрее, чем их можно отправить.
И если вы хотите убедиться, что сообщение отправлено и успешно сохранено, вы можете установить подтверждение на -1 или 1 и повторить попытку на 3 (например)
Более подробную информацию о конфигурации производителя вы можете получить по https://kafka.apache.org/documentation/.