MessageDispatchingException: у Dispatcher нет подписчиков

Имея простую настройку Spring Cloud Stream.

Интерфейс

public interface MyKafkaBinding {

    @Output(PUBLISHER)
    MessageChannel publisher();

    @Input("subscriber")
    SubscribableChannel subscriber();
}

Привязка

@EnableBinding(MyKafkaBinding.class)

Слушатель

@StreamListener(MyKafkaBinding.PUBLISHER)
public void listen(MyEvent message) {
    // handle
}

Свойства приложения

spring.cloud.stream.bindings.publisher.destination=my-kafka-topic
spring.cloud.stream.bindings.publisher.producer.header-mode=headers
spring.cloud.stream.bindings.publisher.content-type=application/json

spring.cloud.stream.bindings.subscriber.destination=my-kafka-topic
spring.cloud.stream.bindings.subscriber.consumer.header-mode=headers
spring.cloud.stream.bindings.subscriber.content-type=application/json

Все работает нормально. Сообщения, отправленные через издателя, принимаются.
Теперь я пытаюсь отправить сообщение в эту тему из другого приложения, используя KafkaTemplate:

kafkaTemplate.send(topic, message)

На этот раз на принимающей стороне выдается ошибка:

Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'MyApp.subscriber'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=com.bax.so.MyEvent@6da11fec, headers={b3=[B@304c5b9f, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@742c6888, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=my-kafka-topic, kafka_receivedTimestamp=1578085559878, kafka_groupId=my-default-group-id}]
   at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
   at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
   at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
   at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
   at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
   at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
   at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
   ... 8 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
   at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
   at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
   at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
   ... 27 common frames omitted

Весенняя версия 5+.
Это вообще допустимый сценарий, отправка сообщений с использованием KafkaTemplate и ожидание их получения подписчиком облачного потока?

2 ответа

Решение

Ваш @StreamListener привязан к каналу издателя, а не к каналу подписчика.

Вот рабочий пример:

@SpringBootApplication
@EnableBinding(MyKafkaBinding.class)
public class So59585815Application {

    public static void main(String[] args) {
        SpringApplication.run(So59585815Application.class, args);
    }

    @Autowired
    private MessageChannel publisher;

    @StreamListener("subscriber")
    public void listen(String in) {
        publisher.send(new GenericMessage<>(in.toUpperCase()));
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("subscriber-topic", "foo".getBytes());
        };
    }

    @KafkaListener(id = "listener", topics = "publisher-topic")
    public void listen(byte[] in) {
        System.out.println(new String(in));
    }

}

interface MyKafkaBinding {

    @Output("publisher")
    MessageChannel publisher();

    @Input("subscriber")
    SubscribableChannel subscriber();

}

а также

spring.cloud.stream.bindings.publisher.destination=publisher-topic
spring.cloud.stream.bindings.subscriber.destination=subscriber-topic
spring.cloud.stream.bindings.subscriber.group=myGroup

spring.kafka.consumer.auto-offset-reset=earliest

Причина этой проблемы в моем проекте:

мой проект не может подключиться к Kafka