Как создать подписку DGS GraphQL на тему ActiveMQ

У меня немного сложный стек технологий. Я использую Netflix DGS для предоставления сервиса GraphQL. За кулисами скрывается куча компонентов JMS, отправляющих и получающих данные от различных сервисов. У меня все работает вне подписки на GraphQL.

В частности, я пытаюсь создать подписку GraphQL для сообщений из раздела ActiveMQ.

Итак, у меня есть SubscriptionDataFetcher следующим образом:

      @DgsComponent
public class SurveyResultsSubscriptionDataFetcher {

    private final Publisher<SurveyResult> surveyResultsReactiveSource;

    @Autowired
    public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
        this.surveyResultsReactiveSource = surveyResultsReactiveSource;
    }

    @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        return surveyResultsReactiveSource;
    }
}

Внутри моей конфигурации Spring я использую следующий поток интеграции Spring:

      @Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
    SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

    return Flux.from(
            IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .log()
                    .toReactivePublisher())
            .map((message) -> converter.fromMessage(message, SurveyResult.class));
}

Скажу несколько вещей:

  1. У меня есть отдельный @JmsListener который получает эти сообщения не по теме
  2. Я не вижу более одного потребителя, даже после того, как установлено соединение через веб-сокет.
  3. Если я подключу репозиторий данных Mongo Reactive Spring к этой подписке GraphQL, данные будут получены клиентом.

Когда я подключаю клиента к подписке, я вижу следующие логи:

      PublishSubscribeChannel      : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler          : Subscription started for 1

Я подозреваю, что контейнер слушателя сообщений не активируется при установке соединения через веб-сокет. Должен ли я «активировать» адаптер канала? Что мне не хватает?

Стек технологий:

          // spring boots - version 2.4.3
    implementation "org.springframework.boot:spring-boot-starter-web"
    implementation "org.springframework.boot:spring-boot-starter-activemq"
    implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
    implementation "org.springframework.boot:spring-boot-starter-integration"
    implementation 'org.springframework.boot:spring-boot-starter-security'

    // spring integration
    implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'

    // dgs
    implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
    implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'

Обновление 1:

Как бы то ни было, если я обновлю подписку до следующего, я получу результаты на стороне клиента.

          @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        // repository is a ReactiveMongoRepository
        return repository.findAll();
    }

1 ответ

Your problem is here:

      return Flux.from(
        IntegrationFlows.from(

And the framework just doesn't see that inner instance to parse and register properly.

To make it working you need to consider to declare that as a top-level bean.

Something like this:

      @Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {  
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .transform([transform to SurveyResult])
                    .toReactivePublisher();
}

Now the framework knows that this logical IntegrationFlow container has to be parsed and all the beans have to be registered and started.

You probably need to rethink your SurveyResultMessageConverter logic to a plain transform() if you can't supply a Jms.messageDrivenChannelAdapter with your converter.

Then in your SurveyResultsSubscriptionDataFetcher you just need to have:

       return surveyResultsReactiveSource.map(Message::getPayload);
Другие вопросы по тегам