Тест интеграции Spring Kafka Ошибка при записи в файл highwatermark
Я пишу интеграционный тест с использованием spring-kaka-2.2.0 в приложении весенней загрузки, мне почти все-таки удалось, мой тестовый набор вернул true, но после этого я вижу множественные ошибки.
2019-02-21 11:12:35.434 ERROR 5717 --- [ Thread-7] kafka.server.ReplicaManager : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645
org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)
Тест Конфиг
@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {
@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
return new EmbeddedKafkaBroker(1,false,2,"test-events");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
@Bean("consumerFactory")
public ConsumerFactory<String, Professor> createConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
JsonDeserializer<Professor> jsonDeserializer = new JsonDeserializer<>(Professor.class,false);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer);
}
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Professor> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Professor> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AckMode.BATCH);
return factory;
}
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public Listener listener() {
return new Listener();
}
public class Listener {
public final CountDownLatch latch = new CountDownLatch(1);
@Getter
public List<Professor> list;
@KafkaListener(topics = "test-events", containerFactory = "kafkaListenerContainerFactory")
public void listen1(List<Professor> foo) {
list=foo;
this.latch.countDown();
}
}
}
Тестовый класс
@EnableKafka
@SpringBootTest(classes = { KafkaProducerConfigTest.class })
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {
@Autowired
private KafkaConsumerService kafkaConsumerService;
@Autowired
private Listener listener;
@Test
public void testReceive() throws Exception {
Professor professor = new Professor("Ajay", new Department("social", 1234));
List<Professor> pro = new ArrayList<>();
pro.add(professor);
System.out.println(pro);
kafkaConsumerService.professor(pro);
System.out.println("The professor object is sent to kafka -----------------------------------");
listener.latch.await();
List<Professor> result = listener.getList();
Professor resultPro = result.get(0);
System.out.println(result);
System.out.println(resultPro);
assertEquals(pro.get(0).getName(), result.get(0).getName());
}
}
Прецедент testReceive()
проходит, но все еще с несколькими сообщениями об ошибках
Ошибка 1 с трассировкой стека
019-02-21 11:12:35.434 ERROR 5717 --- [ Thread-7] kafka.server.ReplicaManager : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645
org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)
Ошибка 2 с трассировкой стека
2019-02-21 11:12:35.446 WARN 5717 --- [pool-8-thread-1] kafka.utils.CoreUtils$ : /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/__consumer_offsets-4/00000000000000000000.index (No such file or directory)
java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/__consumer_offsets-4/00000000000000000000.index (No such file or directory)
Ошибка 3 с трассировкой стека
2019-02-21 11:12:35.451 WARN 5717 --- [pool-8-thread-1] kafka.utils.CoreUtils$ : /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/test-events-0/00000000000000000000.timeindex (No such file or directory)
java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/test-events-0/00000000000000000000.timeindex (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method) ~[na:1.8.0_191]
1 ответ
У вас действительно есть права на запись в /var/folders/s3 ...
?
Вы можете изменить местоположение с помощью
@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
return new EmbeddedKafkaBroker(1,false,2,"test-events")
.brokerProperties(Collections.singletonMap(KafkaConfig.LogDirProp(), "/tmp/foo"));
}
У меня была аналогичная проблема, и с помощью ответа Гэри Рассела я решил ее, указав каталог журнала на выходной каталог сборки gradle. log.dir=out/embedded-kafka
или в случае maven log.dir=target/embedded-kafka
.
В следующем фрагменте кода показано, как это сделать с помощью @EmbeddedKafka
.
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = {Application.class})
@EmbeddedKafka(
topics = "topic",
partitions = 1,
controlledShutdown = true,
brokerProperties={
"log.dir=out/embedded-kafka"
})
@TestPropertySource(
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"
})
public class OutboxEventsTest {
...
}
Просто измените свойства брокера для Embedded Kafka
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {MyApplication.class})
@TestPropertySource(locations = "classpath:application-test.properties")
@EmbeddedKafka(
topics = {"my_topic_name"},
partitions = 1,
brokerProperties = {"log.dir=target/kafka"}
)