Получить данные из темы после нажатия на @EmbeddedKafka в весенней загрузке Junit

Я пишу тестовые примеры Junit (используя @EmbeddedKafka) для своего приложения Spring Boot, которое широко использует Spring-kafka для связи с другими сервисами и для других операций.

Один типичный случай - удаление данных из kafka (что мы делаем с отправкой пустого сообщения в kafka).

В настоящее время в методе delete() мы делаем это, сначала проверяя, существует ли какое-либо сообщение в kafka, которое запрашивается для удаления. Затем мы нажимаем ноль для этого ключа сообщения в Кафке

Последовали шаги в написании Junit для вышеуказанной логики метода.

@Test
public void test(){
   //Push a message to Kafka (id=1234)
   //call test method service.delete(1234);
       //internally service.delete(1234) checks/validate whether message exists in kafka and then push null to delete topic.
  //check delete topic for delete message received.
  // Assertions
}

Проблема здесь в том, что Кафка всегда выдает сообщение не найденное исключение. внутри метода service.delete().

при проверке логов в консоли. я выяснил, что мой производитель-config использует другой порт для kafka, а потребительский config использует другой порт.

Я не уверен, упустил ли я какие-то мелкие детали или в чем причина такого поведения. Любая помощь будет оценена.

1 ответ

У меня есть простое приложение Spring Boot, которое вы можете рассмотреть:

@SpringBootApplication
public class SpringBootEmbeddedKafkaApplication {

    public static final String MY_TOPIC = "myTopic";

    public BlockingQueue<String> kafkaMessages = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        SpringApplication.run(SpringBootEmbeddedKafkaApplication.class, args);
    }

    @KafkaListener(topics = MY_TOPIC)
    public void listener(String payload) {
        this.kafkaMessages.add(payload);
    }

}

application.properties:

spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest

И проверить:

@RunWith(SpringRunner.class)
@SpringBootTest(properties =
        "spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
@EmbeddedKafka(topics = SpringBootEmbeddedKafkaApplication.MY_TOPIC)
public class SpringBootEmbeddedKafkaApplicationTests {

    @Autowired
    private KafkaTemplate<Object, String> kafkaTemplate;

    @Autowired
    private SpringBootEmbeddedKafkaApplication kafkaApplication;

    @Test
    public void testListenerWithEmbeddedKafka() throws InterruptedException {
        String testMessage = "foo";
        this.kafkaTemplate.send(SpringBootEmbeddedKafkaApplication.MY_TOPIC, testMessage);

        assertThat(this.kafkaApplication.kafkaMessages.poll(10, TimeUnit.SECONDS)).isEqualTo(testMessage);
    }

}

Обратите внимание на spring.kafka.consumer.auto-offset-reset=earliest позволить потребителю читать с начала раздела.

Также еще один важный вариант для применения в тесте:

@SpringBootTest(properties =
        "spring.kafka.bootstrapServers:${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")

@EmbeddedKafka заселяет spring.embedded.kafka.brokers Системное свойство и сделайте автоконфигурацию Spring Boot, чтобы узнать о том, что нам нужно скопировать его значение в spring.kafka.bootstrapServers свойство конфигурации.

Или другой вариант в соответствии с нашими документами:

static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Другие вопросы по тегам