Интеграция Spring Kafka с Embbeded Kafka

У меня есть загрузочное приложение Spring, которое будет принимать и выводить сообщения на разные темы, сейчас я пытаюсь написать интеграционный тестовый пример, используя встроенный Spring Kafka.

При запуске теста у меня есть проблема:

Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549)
at org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:128)
at org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:60)
at org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.delegateLoading(AbstractDelegatingSmartContextLoader.java:107)
at org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.loadContext(AbstractDelegatingSmartContextLoader.java:251)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
... 25 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:789)

Эти классы я использую.

Consumer Config class

@Configuration
@EnableKafka
public class ConsumerConfig {

@Value("${kafka.bootstrap-servers}")
private String bootstraServers;

@Value("${kafka.consumer.group-id}")
private String groupId;

@Bean
public Map<String, Object> consumerProps() {

    Map<String, Object> props = new HashMap<>();

    // list of host:port pairs for initial connection
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstraServers);

    // key-value deserializers
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);

    // group id
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

    // reset offset
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    /*
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
     */

    return props;

}

@Bean
public ConsumerFactory<String, User> consumerFactory(){
    return new DefaultKafkaConsumerFactory<>(consumerProps(),
            new StringDeserializer(), new AvroDeserializer<>(User.class));
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> 
    kafkaListenerContainerFactory(){

    ConcurrentKafkaListenerContainerFactory<String, User> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

@Bean
public Consumer consumer() {
    return new Consumer();
}
}

Потребительский класс

@Component
public class Consumer {

// testing convenience. It signals that a message is received.
// NOT FOR PRODUCTION
private CountDownLatch latch = new CountDownLatch(1);

public CountDownLatch getLatch() {
    return latch;
}

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.consumer.group}")
public void onReceiving(User user, 
        @Header(KafkaHeaders.OFFSET) Integer offset,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    log.info("Processing topic = {}, partition = {}, offset = {}, user= {}", 
            topic, partition, offset, user);
    latch.countDown();
}
}

** Производитель Конфиг класс **

@Configuration
public class ProducerConfig {

@Value("${kafka.bootstrap-servers}")
private String bootstraServers;

@Value("${kafka.topic}")
private String topic;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();

    // list of host:port pairs used for establishing the initial connections to the
    // Kafka cluster
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstraServers);

    // Key-Value Serializers
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

    return props;
}

@Bean
public ProducerFactory<String, User> producerFactory(){
    return new DefaultKafkaProducerFactory<String, User>(producerConfigs());
}

@Bean
public KafkaTemplate<String, User> kafkaTemplate(){
    KafkaTemplate<String, user> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);

    return kafkaTemplate;
}

// TODO: create bean for the producer
@Bean
public Producer producer() {
    return new Producer();
}
}

Продюсерский класс

@Component
public class Producer {

@Autowired
private KafkaTemplate<String, User> kafkaTemplate;

public boolean send(User user) {
    try {
        SendResult<String, User> sendResult = 
                kafkaTemplate.sendDefault(user.getUserId(), user).get();

        RecordMetadata recordMetadata = sendResult.getRecordMetadata();

        log.info("topic = {}, partition = {}, offset = {}, user = {}",
                recordMetadata.topic(), recordMetadata.partition(), 
                recordMetadata.offset(), user);

        return true;
    } catch (Exception e) {
        throw new RuntimeException("error sending the message" + e);
    }
}
}

Тестовое задание

public class ProducerConsumerTest {

private static String topic = "user.topic";

@Autowired
private Consumer consumer;

@Autowired
private Producer producer;

@Mock
private User user;

@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, 1, topic);

@Test
public void testReceiver() throws Exception{
    producer.send(user);

    consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

    assertEquals(0, consumer.getLatch().getCount());
}
}

Можете ли вы помочь мне понять это?

Можете ли вы объяснить мне разницу между @EmbeddedKafka и правилом класса KafkaEmbedded?

0 ответов

Другие вопросы по тегам