MessageDispatchingException: у Dispatcher нет подписчиков
Имея простую настройку Spring Cloud Stream.
Интерфейс
public interface MyKafkaBinding {
@Output(PUBLISHER)
MessageChannel publisher();
@Input("subscriber")
SubscribableChannel subscriber();
}
Привязка
@EnableBinding(MyKafkaBinding.class)
Слушатель
@StreamListener(MyKafkaBinding.PUBLISHER)
public void listen(MyEvent message) {
// handle
}
Свойства приложения
spring.cloud.stream.bindings.publisher.destination=my-kafka-topic
spring.cloud.stream.bindings.publisher.producer.header-mode=headers
spring.cloud.stream.bindings.publisher.content-type=application/json
spring.cloud.stream.bindings.subscriber.destination=my-kafka-topic
spring.cloud.stream.bindings.subscriber.consumer.header-mode=headers
spring.cloud.stream.bindings.subscriber.content-type=application/json
Все работает нормально. Сообщения, отправленные через издателя, принимаются.
Теперь я пытаюсь отправить сообщение в эту тему из другого приложения, используя KafkaTemplate:
kafkaTemplate.send(topic, message)
На этот раз на принимающей стороне выдается ошибка:
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'MyApp.subscriber'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=com.bax.so.MyEvent@6da11fec, headers={b3=[B@304c5b9f, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@742c6888, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=my-kafka-topic, kafka_receivedTimestamp=1578085559878, kafka_groupId=my-default-group-id}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
... 8 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 27 common frames omitted
Весенняя версия 5+.
Это вообще допустимый сценарий, отправка сообщений с использованием KafkaTemplate и ожидание их получения подписчиком облачного потока?
2 ответа
Решение
Ваш @StreamListener
привязан к каналу издателя, а не к каналу подписчика.
Вот рабочий пример:
@SpringBootApplication
@EnableBinding(MyKafkaBinding.class)
public class So59585815Application {
public static void main(String[] args) {
SpringApplication.run(So59585815Application.class, args);
}
@Autowired
private MessageChannel publisher;
@StreamListener("subscriber")
public void listen(String in) {
publisher.send(new GenericMessage<>(in.toUpperCase()));
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("subscriber-topic", "foo".getBytes());
};
}
@KafkaListener(id = "listener", topics = "publisher-topic")
public void listen(byte[] in) {
System.out.println(new String(in));
}
}
interface MyKafkaBinding {
@Output("publisher")
MessageChannel publisher();
@Input("subscriber")
SubscribableChannel subscriber();
}
а также
spring.cloud.stream.bindings.publisher.destination=publisher-topic
spring.cloud.stream.bindings.subscriber.destination=subscriber-topic
spring.cloud.stream.bindings.subscriber.group=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
Причина этой проблемы в моем проекте:
мой проект не может подключиться к Kafka