Как написать модульный тест для @KafkaListener?
Попытка выяснить, могу ли я написать модульный тест для @KafkaListener, используя spring-kafka и spring-kafka-test.
Мой класс слушателя.
public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;
@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
myMessageProcessor.process(message);
log.info("MyMessage processed");
}}
Мой тестовый класс:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {
@Autowired
private MyMessageProcessor myMessageProcessor;
@Value("${kafka.topic.01}")
private String TOPIC_01;
@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;
@Test
public void testSalesforceMessageListner() {
MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}
Мой тестовый конфигурационный класс:
@Configuration
@EnableKafka
public class TestKafkaConfig {
@Bean
public MyMessageProcessor myMessageProcessor() {
return mock(MyMessageProcessor.class);
}
@Bean
public KafkaEmbedded kafkaEmbedded() {
return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
}
//Consumer
@Bean
public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(myMessageConsumerFactory());
return factory;
}
//Producer
@Bean
public ProducerFactory<String, MyMessage> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, MyMessage> messageProducer() {
return new KafkaTemplate<>(producerFactory());
}
}
Есть ли простой способ сделать эту работу?
Или я должен провести тестирование @KafkaListener другим способом? В модульном тесте, как убедиться, что @KafkaListener вызывается, когда в Кафку поступает новое сообщение.
5 ответов
Как я могу убедиться, что @KafkaListener вызывается при получении нового сообщения в Kafka.
Что ж, по сути это ответственность Framework за тестирование такой функциональности. В вашем случае вам нужно просто сконцентрироваться на бизнес-логике и модульном тестировании именно вашего пользовательского кода, но не того, который скомпилирован в Framework. Кроме того, нет смысла проверять @KafkaListener
метод, который просто регистрирует входящие сообщения. Определенно будет слишком сложно найти хук для проверки в тестовом случае.
С другой стороны, я действительно верю, что бизнес-логика в вашем @KafkaListener
Метод намного сложнее, чем вы показываете. Таким образом, было бы действительно лучше проверить ваш пользовательский код (например, вставка БД, какой-либо другой вызов службы и т. Д.), Вызванный из этого метода, а не пытаться выяснить ловушку точно для myMessageListener()
,
Что вы делаете с mock(MyMessageProcessor.class)
действительно хороший способ проверки бизнес-логики. Только то, что не так в вашем коде, касается этого дублирования для EmbeddedKafka
: вы используете аннотацию и также объявляете @Bean
в конфиге. Вы должны подумать об удалении одного из них. Хотя не ясно, где находится ваш производственный код, который действительно свободен от встроенного Kafka. В противном случае, если все находится в области тестирования, я не вижу проблем с конфигурацией фабрики вашего потребителя и производителя. У вас определенно есть минимально возможный конфиг для @KafkaListener
а также KafkaTemplate
, Единственное, что вам нужно, это удалить @EmbeddedKafka
не запускайте брокера дважды.
Вы можете обернуть слушателя в ваш тестовый набор.
Дано
@SpringBootApplication
public class So52783066Application {
public static void main(String[] args) {
SpringApplication.run(So52783066Application.class, args);
}
@KafkaListener(id = "so52783066", topics = "so52783066")
public void listen(String in) {
System.out.println(in);
}
}
затем
@RunWith(SpringRunner.class)
@SpringBootTest
public class So52783066ApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "so52783066");
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, String> template;
@Before
public void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@Test
public void test() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
.getListenerContainer("so52783066");
container.stop();
@SuppressWarnings("unchecked")
AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container
.getContainerProperties().getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties()
.setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
}
});
container.start();
template.send("so52783066", "foo");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}
Вот мое рабочее решение для потребителя, основанное на вашем коде. Спасибо:-)
Конфигурация следующая:
@TestConfiguration
@EnableKafka
@Profile("kafka_test")
public class KafkaTestConfig {
private static Logger log = LoggerFactory.getLogger(KafkaTestConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
@Primary
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
log.info("Consumer TEST config = {}", props);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
log.info("Producer TEST config = {}", props);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new
StringDeserializer(),
new JsonDeserializer<String>());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> pf =
new DefaultKafkaProducerFactory<>(producerConfigs());
return pf;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(
ConsumerFactory<String, String> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.setConcurrency(2);
return factory;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>
(producerFactory());
return kafkaTemplate;
}
@Bean
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry =
new KafkaListenerEndpointRegistry();
return kafkaListenerEndpointRegistry;
}
}
Поместите все компоненты, которые нужно включить в тест, в другой класс:
@TestConfiguration
@Profile("kafka_test")
@EnableKafka
public class KafkaBeansConfig {
@Bean
public MyProducer myProducer() {
return new MyProducer();
}
// more beans
}
Я создал класс BaseKafkaConsumerTest для его повторного использования:
@ExtendWith(SpringExtension.class)
@TestPropertySource(
properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
@TestInstance(Lifecycle.PER_CLASS)
@DirtiesContext
@ContextConfiguration(classes = KafkaTestConfig.class)
@ActiveProfiles("kafka_test")
public class BaseKafkaConsumerTest {
@Autowired
protected EmbeddedKafkaBroker embeddedKafka;
@Value("${spring.embedded.kafka.brokers}")
private String brokerAddresses;
@Autowired
protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
protected KafkaTemplate<String, String> senderTemplate;
public void setUp() {
embeddedKafka.brokerProperty("controlled.shutdown.enable", true);
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
System.err.println(messageListenerContainer.getContainerProperties().toString());
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafka.getPartitionsPerTopic());
}
}
@AfterAll
public void tearDown() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
messageListenerContainer.stop();
}
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());
}
}
Расширьте базовый класс, чтобы обуздать своего потребителя:
@EmbeddedKafka(topics = MyConsumer.TOPIC_NAME)
@Import(KafkaBeansConfig.class)
public class MYKafkaConsumerTest extends BaseKafkaConsumerTest {
private static Logger log = LoggerFactory.getLogger(PaymentMethodsKafkaConsumerTest.class);
@Autowired
private MyConsumer myConsumer;
//mocks with @MockBean
@Configuration
@ComponentScan({"com.myfirm.kafka"})
static class KafkaLocalTestConfig {
}
@BeforeAll
public void setUp() {
super.setUp();
}
@Test
public void testMessageIsReceived() throws Exception {
//mocks
String jsonPayload = "{\"id\":\"12345\","cookieDomain\":"helloworld"}";
ListenableFuture<SendResult<String, String>> future =
senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);
Thread.sleep(10000);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("successfully sent message='{}' with offset={}", jsonPayload,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("unable to send message='{}'", jsonPayload, ex);
}
});
Mockito.verify(myService, Mockito.times(1))
.update(Mockito.any(MyDetails.class));
}
Как я читал в других постах, не проверяйте бизнес-логику таким образом. Просто звонки сделаны.
Если вы хотите писать интеграционные тесты, используяEmbeddedKafka
, то вы можете сделать что-то вроде этого. Предположим, у нас есть некоторыеKafkaListener
, который принимаетRequestDto
какPayload
.
В своем тестовом классе вы должны создатьTestConfiguration
для создания bean-компонентов-производителей и автоматического связыванияKafkaTemplate
в свой тест. Также обратите внимание, что вместо автоматического подключения потребителя мы внедряем потребителя.SpyBean
.
ВsomeTest
мы создаем защелку и настраиваем метод слушателя-потребителя так, чтобы при его вызове защелка открывалась, а утверждения выполнялись только после того, как слушатель получил полезную нагрузку.
Также обратите вниманиеany() ?: RequestDto()
линия. Вы должны использовать оператор elvis только в том случае, если вы используете Mockito с ненулевыми аргументами метода Kotlin, потому чтоany()
сначала возвращает ноль.
@EnableKafka
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@EmbeddedKafka(partitions = 10, brokerProperties = ["listeners=PLAINTEXT://localhost:9092", "port=9092"])
class KafkaIgniteApplicationTests {
@SpyBean
private lateinit var consumer: Consumer
@TestConfiguration
class Config {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
private lateinit var servers: String
fun producerConfig(): Map<String, Any> {
val props = mutableMapOf<String, Any>()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return props
}
@Bean
fun producerFactory(): ProducerFactory<String, String> {
return DefaultKafkaProducerFactory(producerConfig())
}
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<String, String>): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory)
}
}
@Autowired
private lateinit var kafkaTemplate: KafkaTemplate<String, String>
@Test
fun someTest() {
val lock = CountDownLatch(1)
`when`(consumer.receive(any() ?: RequestDto())).thenAnswer {
it.callRealMethod()
lock.countDown()
}
val request = "{\"value\":\"1\"}"
kafkaTemplate.send(TOPIC, request)
lock.await(1000, TimeUnit.MILLISECONDS)
verify(consumer).receive(RequestDto().apply { value = BigDecimal.ONE })
}
}
В модульном тесте, как я могу гарантировать, что @KafkaListener вызывается, когда новое сообщение поступает в Kafka.
Вместо использованияAwaitility
илиCountDownLatch
подход, более простой способ сделать фактический@KafkaListener
Бин как мокито-шпион, использующий@SpyBean
. Spy в основном позволяет вам записывать все взаимодействия, сделанные с фактическим экземпляром компонента, чтобы вы могли проверить его взаимодействие позже. Вместе с функцией проверки тайм-аута mockito вы можете гарантировать, что проверка будет выполняться снова и снова до определенного тайм-аута после того, как производитель отправит сообщение.
Что-то вроде :
@SpringBootTest(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
@EmbeddedKafka(topics = {"fooTopic"})
public class MyMessageConsumersTest {
@SpyBean
private MyKafkaListener myKafkaListener;
@Captor
private ArgumentCaptor<MyMessage> myMessageCaptor;
@Test
public void test(){
//create KafkaTemplate to send some message to the topic...
verify(myKafkaListener, timeout(5000)). myMessageListener(myMessageCaptor.capture());
//assert the KafkaListener is configured correctly such that it is invoked with the expected parameter
assertThat(myMessageCaptor.getValue()).isEqualTo(xxxxx);
}