Как создать подписку DGS GraphQL на тему ActiveMQ
У меня немного сложный стек технологий. Я использую Netflix DGS для предоставления сервиса GraphQL. За кулисами скрывается куча компонентов JMS, отправляющих и получающих данные от различных сервисов. У меня все работает вне подписки на GraphQL.
В частности, я пытаюсь создать подписку GraphQL для сообщений из раздела ActiveMQ.
Итак, у меня есть SubscriptionDataFetcher следующим образом:
@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {
private final Publisher<SurveyResult> surveyResultsReactiveSource;
@Autowired
public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
this.surveyResultsReactiveSource = surveyResultsReactiveSource;
}
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
return surveyResultsReactiveSource;
}
}
Внутри моей конфигурации Spring я использую следующий поток интеграции Spring:
@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return Flux.from(
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher())
.map((message) -> converter.fromMessage(message, SurveyResult.class));
}
Скажу несколько вещей:
- У меня есть отдельный
@JmsListener
который получает эти сообщения не по теме - Я не вижу более одного потребителя, даже после того, как установлено соединение через веб-сокет.
- Если я подключу репозиторий данных Mongo Reactive Spring к этой подписке GraphQL, данные будут получены клиентом.
Когда я подключаю клиента к подписке, я вижу следующие логи:
PublishSubscribeChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler : Subscription started for 1
Я подозреваю, что контейнер слушателя сообщений не активируется при установке соединения через веб-сокет. Должен ли я «активировать» адаптер канала? Что мне не хватает?
Стек технологий:
// spring boots - version 2.4.3
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-activemq"
implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
implementation "org.springframework.boot:spring-boot-starter-integration"
implementation 'org.springframework.boot:spring-boot-starter-security'
// spring integration
implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'
// dgs
implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'
Обновление 1:
Как бы то ни было, если я обновлю подписку до следующего, я получу результаты на стороне клиента.
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
// repository is a ReactiveMongoRepository
return repository.findAll();
}
1 ответ
Your problem is here:
return Flux.from(
IntegrationFlows.from(
And the framework just doesn't see that inner instance to parse and register properly.
To make it working you need to consider to declare that as a top-level bean.
Something like this:
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.transform([transform to SurveyResult])
.toReactivePublisher();
}
Now the framework knows that this logical
IntegrationFlow
container has to be parsed and all the beans have to be registered and started.
You probably need to rethink your
SurveyResultMessageConverter
logic to a plain
transform()
if you can't supply a
Jms.messageDrivenChannelAdapter
with your converter.
Then in your
SurveyResultsSubscriptionDataFetcher
you just need to have:
return surveyResultsReactiveSource.map(Message::getPayload);