Как настроить порт в KafkaEmbedded при юнит-тестировании Spring-Kafka-потребителя

Я использую spring-boot-starter-parent версия 1.5.0.RELEASE, spring-kafka версия 1.0.0.RELEASE а также spring-kafka-test версия 1.0.0.RELEASE в приложении, которое потребляет сообщения от Kakfa 0.9 кластер. У меня есть юнит-тест для моего потребителя, который использовал KafkaEmbedded но это терпит неудачу, так как порт брокера выбирается случайным образом. Есть ли способ установить свойство этого брокера без изменения версии? Или какие версии я должен использовать, чтобы ничего не сломать?

Вот код для KafkaListener а также KafkaConsumerTest,

Listener.java

@Service
public class Listener {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);
    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
    public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
        logger.info(msg);
        latch.countDown();
        ack.acknowledge();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

KafkaConsumerTest.java (РЕДАКТИРОВАТЬ)

@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
    private static String TEST_TOPIC = "topic";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

    public KafkaTemplate<String, String> template;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    private Listener listener;

    @Before
    public void init(){
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
        Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
        senderProps.put("key.serializer", StringSerializer.class);
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps);
        template = new KafkaTemplate<>(producerFactory);
        template.setDefaultTopic(TEST_TOPIC);
    }

    @Test
    public void testConsume() throws Exception {
        String record = "message";
        template.sendDefault(TEST_TOPIC, record);
        logger.debug("test-consume sent record {}", record);
        listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
        Assert.assertEquals(listener.getLatch().getCount(), 0);
    }
}

2 ответа

Пожалуйста, используйте spring-kafka 1.3.9 с загрузкой 1.5; более ранние версии больше не поддерживаются. Текущая версия загрузки 1.5.x - 1.5.21.

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

static {
    embeddedKafka.setKafkaPorts(1234);
}

setKafkaPorts был доступен с 1.3.

Тем не менее, вы правильно используете выделенный случайный порт в своем тесте

Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());

Чтобы заставить слушателя kafka подключаться к встроенному брокеру, вы можете использовать.

    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); 

Я думаю, в то время как контекст приложения загружается для тестирования, два компонента создаются типа (ProducerFactory and KafkaTemplate)один с оригинальными конфигами, второй с тестовыми конфигами, попробуйте использовать другой профиль для тестов application-test.yml и добавить свойство переопределения бобов

spring.main.allow-bean-definition-overriding to true.

Чтобы он переопределял компоненты приложения с помощью тестовых компонентов, а также объявлял ProducerFactory а также KafkaTemplate как бобы в тесте с тем же именем, что и в приложении

Другие вопросы по тегам