Как написать модульный тест для @KafkaListener?

Попытка выяснить, могу ли я написать модульный тест для @KafkaListener, используя spring-kafka и spring-kafka-test.

Мой класс слушателя.

public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;

@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
    myMessageProcessor.process(message);
    log.info("MyMessage processed");
}}

Мой тестовый класс:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {

@Autowired
private MyMessageProcessor myMessageProcessor;

@Value("${kafka.topic.01}")
private String TOPIC_01;

@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;

@Test
public void testSalesforceMessageListner() {
    MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
    messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
    verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}

Мой тестовый конфигурационный класс:

    @Configuration
    @EnableKafka
    public class TestKafkaConfig {
    @Bean
    public MyMessageProcessor myMessageProcessor() {
        return mock(MyMessageProcessor.class);
    }
    @Bean
    public KafkaEmbedded kafkaEmbedded() {
        return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
    }

    //Consumer
    @Bean
    public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(myMessageConsumerFactory());
        return factory;
    }

    //Producer
    @Bean
    public ProducerFactory<String, MyMessage> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    @Bean
    public KafkaTemplate<String, MyMessage> messageProducer() {
        return new KafkaTemplate<>(producerFactory());
    }
    }

Есть ли простой способ сделать эту работу?

Или я должен провести тестирование @KafkaListener другим способом? В модульном тесте, как убедиться, что @KafkaListener вызывается, когда в Кафку поступает новое сообщение.

5 ответов

Как я могу убедиться, что @KafkaListener вызывается при получении нового сообщения в Kafka.

Что ж, по сути это ответственность Framework за тестирование такой функциональности. В вашем случае вам нужно просто сконцентрироваться на бизнес-логике и модульном тестировании именно вашего пользовательского кода, но не того, который скомпилирован в Framework. Кроме того, нет смысла проверять @KafkaListener метод, который просто регистрирует входящие сообщения. Определенно будет слишком сложно найти хук для проверки в тестовом случае.

С другой стороны, я действительно верю, что бизнес-логика в вашем @KafkaListener Метод намного сложнее, чем вы показываете. Таким образом, было бы действительно лучше проверить ваш пользовательский код (например, вставка БД, какой-либо другой вызов службы и т. Д.), Вызванный из этого метода, а не пытаться выяснить ловушку точно для myMessageListener(),

Что вы делаете с mock(MyMessageProcessor.class) действительно хороший способ проверки бизнес-логики. Только то, что не так в вашем коде, касается этого дублирования для EmbeddedKafka: вы используете аннотацию и также объявляете @Bean в конфиге. Вы должны подумать об удалении одного из них. Хотя не ясно, где находится ваш производственный код, который действительно свободен от встроенного Kafka. В противном случае, если все находится в области тестирования, я не вижу проблем с конфигурацией фабрики вашего потребителя и производителя. У вас определенно есть минимально возможный конфиг для @KafkaListener а также KafkaTemplate, Единственное, что вам нужно, это удалить @EmbeddedKafka не запускайте брокера дважды.

Вы можете обернуть слушателя в ваш тестовый набор.

Дано

@SpringBootApplication
public class So52783066Application {

    public static void main(String[] args) {
        SpringApplication.run(So52783066Application.class, args);
    }

    @KafkaListener(id = "so52783066", topics = "so52783066")
    public void listen(String in) {
        System.out.println(in);
    }

}

затем

@RunWith(SpringRunner.class)
@SpringBootTest
public class So52783066ApplicationTests {

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "so52783066");

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private KafkaTemplate<String, String> template;

    @Before
    public void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void test() throws Exception {
        ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
                .getListenerContainer("so52783066");
        container.stop();
        @SuppressWarnings("unchecked")
        AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container
                .getContainerProperties().getMessageListener();
        CountDownLatch latch = new CountDownLatch(1);
        container.getContainerProperties()
                .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>() {

                    @Override
                    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment,
                            Consumer<?, ?> consumer) {
                        messageListener.onMessage(data, acknowledgment, consumer);
                        latch.countDown();
                    }

                });
        container.start();
        template.send("so52783066", "foo");
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    }

}

Вот мое рабочее решение для потребителя, основанное на вашем коде. Спасибо:-)

Конфигурация следующая:

@TestConfiguration
@EnableKafka
@Profile("kafka_test")
public class KafkaTestConfig {

   private static Logger log = LoggerFactory.getLogger(KafkaTestConfig.class);

   @Value("${spring.kafka.bootstrap-servers}")
   private String bootstrapServers;

   @Bean
   @Primary
   public Map<String, Object> consumerConfigs() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
   props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
   props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

   log.info("Consumer TEST config = {}", props);
   return props;
  }

 @Bean
 public Map<String, Object> producerConfigs() {
     Map<String, Object> props = new HashMap<>();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

     log.info("Producer TEST config = {}", props);
      return props;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new 
      StringDeserializer(),
      new JsonDeserializer<String>());
  }

@Bean
 public ProducerFactory<String, String> producerFactory() {
   DefaultKafkaProducerFactory<String, String> pf =
      new DefaultKafkaProducerFactory<>(producerConfigs());
   return pf;
 }

 @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> 
 kafkaListenerContainerFactory(
   ConsumerFactory<String, String> kafkaConsumerFactory) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
     new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.getContainerProperties().setAckOnError(false);
  factory.setConcurrency(2);
  return factory;
}

 @Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
  KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<> 
 (producerFactory());
   return kafkaTemplate;
 }

@Bean
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
  KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry =
      new KafkaListenerEndpointRegistry();
  return kafkaListenerEndpointRegistry;
}

}

Поместите все компоненты, которые нужно включить в тест, в другой класс:

@TestConfiguration
@Profile("kafka_test")
@EnableKafka
 public class KafkaBeansConfig {

 @Bean
  public MyProducer myProducer() {
  return new MyProducer();
 }

  // more beans
  }

Я создал класс BaseKafkaConsumerTest для его повторного использования:

 @ExtendWith(SpringExtension.class)
 @TestPropertySource(
     properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
@TestInstance(Lifecycle.PER_CLASS)
@DirtiesContext
@ContextConfiguration(classes = KafkaTestConfig.class)
@ActiveProfiles("kafka_test")
public class BaseKafkaConsumerTest {

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    @Value("${spring.embedded.kafka.brokers}")
    private String brokerAddresses;

    @Autowired
    protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    protected KafkaTemplate<String, String> senderTemplate;

    public void setUp() {
        embeddedKafka.brokerProperty("controlled.shutdown.enable", true);

        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
        .getListenerContainers()) {
        System.err.println(messageListenerContainer.getContainerProperties().toString());
        ContainerTestUtils.waitForAssignment(messageListenerContainer,
            embeddedKafka.getPartitionsPerTopic());
    }
    }

    @AfterAll
    public void tearDown() {
        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
        .getListenerContainers()) {
        messageListenerContainer.stop();
    }

    embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
    embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());
    }

}

Расширьте базовый класс, чтобы обуздать своего потребителя:

@EmbeddedKafka(topics = MyConsumer.TOPIC_NAME)
@Import(KafkaBeansConfig.class)
public class MYKafkaConsumerTest extends BaseKafkaConsumerTest {

    private static Logger log = LoggerFactory.getLogger(PaymentMethodsKafkaConsumerTest.class);


    @Autowired
    private MyConsumer myConsumer;

        //mocks with @MockBean

    @Configuration
    @ComponentScan({"com.myfirm.kafka"})
    static class KafkaLocalTestConfig  {
    }

    @BeforeAll
    public void setUp() {
        super.setUp();
    }


    @Test
    public void testMessageIsReceived() throws Exception {

    //mocks

    String jsonPayload = "{\"id\":\"12345\","cookieDomain\":"helloworld"}";
    ListenableFuture<SendResult<String, String>> future =
        senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);

    Thread.sleep(10000);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
    public void onSuccess(SendResult<String, String> result) {
        log.info("successfully sent message='{}' with offset={}", jsonPayload,
            result.getRecordMetadata().offset());
    }

    @Override
    public void onFailure(Throwable ex) {
        log.error("unable to send message='{}'", jsonPayload, ex);
    }
    });

    Mockito.verify(myService, Mockito.times(1))
    .update(Mockito.any(MyDetails.class));
    }

Как я читал в других постах, не проверяйте бизнес-логику таким образом. Просто звонки сделаны.

Если вы хотите писать интеграционные тесты, используяEmbeddedKafka, то вы можете сделать что-то вроде этого. Предположим, у нас есть некоторыеKafkaListener, который принимаетRequestDtoкакPayload.

В своем тестовом классе вы должны создатьTestConfigurationдля создания bean-компонентов-производителей и автоматического связыванияKafkaTemplateв свой тест. Также обратите внимание, что вместо автоматического подключения потребителя мы внедряем потребителя.SpyBean.

ВsomeTestмы создаем защелку и настраиваем метод слушателя-потребителя так, чтобы при его вызове защелка открывалась, а утверждения выполнялись только после того, как слушатель получил полезную нагрузку.

Также обратите вниманиеany() ?: RequestDto()линия. Вы должны использовать оператор elvis только в том случае, если вы используете Mockito с ненулевыми аргументами метода Kotlin, потому чтоany()сначала возвращает ноль.

      @EnableKafka
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@EmbeddedKafka(partitions = 10, brokerProperties = ["listeners=PLAINTEXT://localhost:9092", "port=9092"])
class KafkaIgniteApplicationTests {

    @SpyBean
    private lateinit var consumer: Consumer

    @TestConfiguration
    class Config {

        @Value("\${spring.kafka.consumer.bootstrap-servers}")
        private lateinit var servers: String

        fun producerConfig(): Map<String, Any> {
            val props = mutableMapOf<String, Any>()
            props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
            props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            return props
        }

        @Bean
        fun producerFactory(): ProducerFactory<String, String> {
            return DefaultKafkaProducerFactory(producerConfig())
        }

        @Bean
        fun kafkaTemplate(producerFactory: ProducerFactory<String, String>): KafkaTemplate<String, String> {
            return KafkaTemplate(producerFactory)
        }
    }

    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, String>

    @Test
    fun someTest() {
        val lock = CountDownLatch(1)
        `when`(consumer.receive(any() ?: RequestDto())).thenAnswer {
            it.callRealMethod()
            lock.countDown()
        }
        val request = "{\"value\":\"1\"}"
        kafkaTemplate.send(TOPIC, request)
        lock.await(1000, TimeUnit.MILLISECONDS)
        verify(consumer).receive(RequestDto().apply { value = BigDecimal.ONE })
    }
}

В модульном тесте, как я могу гарантировать, что @KafkaListener вызывается, когда новое сообщение поступает в Kafka.

Вместо использованияAwaitilityилиCountDownLatchподход, более простой способ сделать фактический@KafkaListenerБин как мокито-шпион, использующий@SpyBean. Spy в основном позволяет вам записывать все взаимодействия, сделанные с фактическим экземпляром компонента, чтобы вы могли проверить его взаимодействие позже. Вместе с функцией проверки тайм-аута mockito вы можете гарантировать, что проверка будет выполняться снова и снова до определенного тайм-аута после того, как производитель отправит сообщение.

Что-то вроде :

      @SpringBootTest(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
@EmbeddedKafka(topics = {"fooTopic"})
public class MyMessageConsumersTest {

   @SpyBean
   private MyKafkaListener myKafkaListener;

   @Captor
   private ArgumentCaptor<MyMessage> myMessageCaptor;

   @Test
   public void test(){

      //create KafkaTemplate to send some message to the topic...

       verify(myKafkaListener, timeout(5000)). myMessageListener(myMessageCaptor.capture());
       
      //assert the KafkaListener is configured correctly such that it is invoked with the expected parameter
       assertThat(myMessageCaptor.getValue()).isEqualTo(xxxxx);
      
   }
Другие вопросы по тегам