Как настроить порт в KafkaEmbedded при юнит-тестировании Spring-Kafka-потребителя
Я использую spring-boot-starter-parent
версия 1.5.0.RELEASE
, spring-kafka
версия 1.0.0.RELEASE
а также spring-kafka-test
версия 1.0.0.RELEASE
в приложении, которое потребляет сообщения от Kakfa 0.9
кластер. У меня есть юнит-тест для моего потребителя, который использовал KafkaEmbedded
но это терпит неудачу, так как порт брокера выбирается случайным образом. Есть ли способ установить свойство этого брокера без изменения версии? Или какие версии я должен использовать, чтобы ничего не сломать?
Вот код для KafkaListener
а также KafkaConsumerTest
,
Listener.java
@Service
public class Listener {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}
public CountDownLatch getLatch() {
return latch;
}
}
KafkaConsumerTest.java (РЕДАКТИРОВАТЬ)
@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
public KafkaTemplate<String, String> template;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private Listener listener;
@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps);
template = new KafkaTemplate<>(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}
@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}
2 ответа
Пожалуйста, используйте spring-kafka 1.3.9 с загрузкой 1.5; более ранние версии больше не поддерживаются. Текущая версия загрузки 1.5.x - 1.5.21.
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
static {
embeddedKafka.setKafkaPorts(1234);
}
setKafkaPorts
был доступен с 1.3.
Тем не менее, вы правильно используете выделенный случайный порт в своем тесте
Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
Чтобы заставить слушателя kafka подключаться к встроенному брокеру, вы можете использовать.
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Я думаю, в то время как контекст приложения загружается для тестирования, два компонента создаются типа (ProducerFactory and KafkaTemplate)
один с оригинальными конфигами, второй с тестовыми конфигами, попробуйте использовать другой профиль для тестов application-test.yml
и добавить свойство переопределения бобов
spring.main.allow-bean-definition-overriding to true.
Чтобы он переопределял компоненты приложения с помощью тестовых компонентов, а также объявлял ProducerFactory
а также KafkaTemplate
как бобы в тесте с тем же именем, что и в приложении