EmbeddedKafka выдает RecordTooLargeException, даже если значения конфигурации установлены
Я пытаюсь увеличить размер сообщения по умолчанию kafka с 1 МБ до 10 МБ. Я тестирую свою новую конфигурацию с EmbeddedKafka и ScalaTest, но она не работает.
Используя этот ответ, я соответственно увеличил значения конфигурации:
Маклер:
message.max.bytes
replica.fetch.max.bytes
Потребитель:
max.partition.fetch.bytes
Режиссер:
max.request.size
Мой код:
val broker = s"localhost:${kafkaConfig.kafkaPort}"
val maxSize: String = (ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES * 10).toString // 10MiB
val embeddedBrokerConfig = Map(
"message.max.bytes" -> maxSize,
"replica.fetch.max.bytes" -> maxSize
)
val embeddedConsumerConfig = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> broker,
ConsumerConfig.GROUP_ID_CONFIG -> consumerGroup,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG -> maxSize
)
val embeddedProducerConfig = Map[String, String](
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> broker,
ProducerConfig.MAX_REQUEST_SIZE_CONFIG -> maxSize
)
val bigKafkaConfig =
EmbeddedKafkaConfig(
kafkaConfig.kafkaPort,
kafkaConfig.zooKeeperPort,
customBrokerProperties = embeddedBrokerConfig,
customConsumerProperties = embeddedConsumerConfig,
customProducerProperties = embeddedProducerConfig
)
val bigMessage = ("H" * 999999).getBytes()
EmbeddedKafka.publishToKafka(inTopic, bigMessage)(bigKafkaConfig, valueSerializer)
Когда я запускаю этот код с сообщением только 999999 байт, что меньше 1 МБ, я получаю эту ошибку:
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at net.manub.embeddedkafka.EmbeddedKafkaSupport.$anonfun$publishToKafka$7(EmbeddedKafka.scala:276)
at scala.util.Try$.apply(Try.scala:209)
at net.manub.embeddedkafka.EmbeddedKafkaSupport.publishToKafka(EmbeddedKafka.scala:276)
Это ошибка в EmbeddedKafka? Или я неправильно настроил свое приложение?
1 ответ
Я нашел проблему. Это было связано с конфигурацией EmbeddedKafka
,
Был beforeAll
утверждение запуска EmbeddedKafka перед моими тестами с конфигурацией по умолчанию. EmbeddedKafka.start
требует обновленных настроек брокера, передавая их для публикации после EmbeddedKafka
начал ничего не делает.