Spring Kafka @SendTo не отправляет заголовки
Я отправляю сообщение Кафке, используя ReplyingKafkaTemplate
и он отправляет сообщение с kafka_correlationId
, Тем не менее, когда он попадает в мой @KafkaListener
метод и перенаправляет его в тему ответа, заголовки теряются.
Как мне сохранить заголовки кафки?
Вот моя подпись метода:
@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
... /* some processing */
return outputs;
}
Я создал ProducerInterceptor
так что я могу видеть, какие заголовки отправляются с ReplyingKafkaTemplate
, а также из @SendTo
аннотаций. Из этого, еще одна странная вещь заключается в том, что ReplyingKafkaTemplate
не добавляет документально kafka_replyTopic
заголовок сообщения.
Вот как ReplyingKafkaTemplate
настроен:
@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
Я не уверен, что это уместно, но я добавил Spring Cloud Sleuth в качестве зависимости, и заголовки span/trace присутствуют при отправке сообщений, но новые создаются при пересылке сообщения.
1 ответ
Произвольные заголовки из сообщения запроса по умолчанию не копируются в ответное сообщение, только kafka_correlationId
,
Начиная с версии 2.2, вы можете настроить ReplyHeadersConfigurer
который вызывается, чтобы определить, какие заголовки должны быть скопированы.
Смотрите документацию.
Начиная с версии 2.2, вы можете добавить
ReplyHeadersConfigurer
слушателю контейнерной фабрики. Это делается для определения того, какие заголовки вы хотите установить в ответном сообщении.
РЕДАКТИРОВАТЬ
Кстати, в 2.2 RKT автоматически устанавливает responseTo, если заголовка нет.
С 2.1.x это можно сделать, но это немного сложнее, и вы должны выполнить часть работы самостоятельно. Ключ должен получить и ответить Message<?>
...
@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
System.out.println(in);
Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
return MessageBuilder.withPayload(in.getPayload().toUpperCase())
.setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader(KafkaHeaders.TOPIC, replyTo)
.build();
}
// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
kafkaTemplate.setMessageConverter(messageConverter);
return kafkaTemplate;
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
ConsumerRecord<String, String> reply = future.get();
System.out.println("Reply: " + reply.value() + " myHeader="
+ new String(reply.headers().lastHeader("myHeader").value()));
};
}