Spring Kafka @SendTo не отправляет заголовки

Я отправляю сообщение Кафке, используя ReplyingKafkaTemplate и он отправляет сообщение с kafka_correlationId, Тем не менее, когда он попадает в мой @KafkaListener метод и перенаправляет его в тему ответа, заголовки теряются.

Как мне сохранить заголовки кафки?

Вот моя подпись метода:

@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
  ... /* some processing */
  return outputs;
}

Я создал ProducerInterceptor так что я могу видеть, какие заголовки отправляются с ReplyingKafkaTemplate, а также из @SendTo аннотаций. Из этого, еще одна странная вещь заключается в том, что ReplyingKafkaTemplate не добавляет документально kafka_replyTopic заголовок сообщения.

Вот как ReplyingKafkaTemplate настроен:

@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
  ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
  return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
  return new ReplyingKafkaTemplate<>(pf, container);
}

Я не уверен, что это уместно, но я добавил Spring Cloud Sleuth в качестве зависимости, и заголовки span/trace присутствуют при отправке сообщений, но новые создаются при пересылке сообщения.

1 ответ

Решение

Произвольные заголовки из сообщения запроса по умолчанию не копируются в ответное сообщение, только kafka_correlationId,

Начиная с версии 2.2, вы можете настроить ReplyHeadersConfigurer который вызывается, чтобы определить, какие заголовки должны быть скопированы.

Смотрите документацию.

Начиная с версии 2.2, вы можете добавить ReplyHeadersConfigurer слушателю контейнерной фабрики. Это делается для определения того, какие заголовки вы хотите установить в ответном сообщении.

РЕДАКТИРОВАТЬ

Кстати, в 2.2 RKT автоматически устанавливает responseTo, если заголовка нет.

С 2.1.x это можно сделать, но это немного сложнее, и вы должны выполнить часть работы самостоятельно. Ключ должен получить и ответить Message<?>...

@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
    System.out.println(in);
    Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
    byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
    byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
    return MessageBuilder.withPayload(in.getPayload().toUpperCase())
            .setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .build();
}

// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    MessagingMessageConverter messageConverter = new MessagingMessageConverter();
    messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
    kafkaTemplate.setMessageConverter(messageConverter);
    return kafkaTemplate;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
    return args -> {
        Headers headers = new RecordHeaders();
        headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
        headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
        ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
        RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
        ConsumerRecord<String, String> reply = future.get();
        System.out.println("Reply: " + reply.value() + " myHeader="
                + new String(reply.headers().lastHeader("myHeader").value()));
    };
}
Другие вопросы по тегам