如何创建对 ActiveMQ 主题的 DGS GraphQL 订阅
How to create a DGS GraphQL Subscription to an ActiveMQ Topic
我的技术栈有点复杂。我正在利用 Netflix DGS 提供 GraphQL 服务。在幕后是一堆从各种服务发送和接收数据的 JMS 组件。我在 GraphQL 订阅之外的一切都在工作。
具体来说,我想做的是为来自 ActiveMQ 主题的消息创建一个 GraphQL 订阅。
所以我有一个 SubscriptionDataFetcher 如下:
@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {
private final Publisher<SurveyResult> surveyResultsReactiveSource;
@Autowired
public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
this.surveyResultsReactiveSource = surveyResultsReactiveSource;
}
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
return surveyResultsReactiveSource;
}
}
在我的 Spring 配置中,我使用以下 Spring 集成流程:
@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return Flux.from(
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher())
.map((message) -> converter.fromMessage(message, SurveyResult.class));
}
我说几句:
- 我有一个单独的
@JmsListener
正在接收这些与主题无关的消息
- 我没有看到不止一个消费者,即使在建立网络套接字连接后也是如此。
- 如果我将 Mongo 响应式 Spring 数据存储库连接到此 GraphQL 订阅,客户端会收到数据。
当我将客户端连接到订阅时,我看到以下日志:
PublishSubscribeChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler : Subscription started for 1
我怀疑在建立网络套接字连接时消息监听器容器没有被激活。我应该“激活”通道适配器吗?我错过了什么?
技术堆栈:
// spring boots - version 2.4.3
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-activemq"
implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
implementation "org.springframework.boot:spring-boot-starter-integration"
implementation 'org.springframework.boot:spring-boot-starter-security'
// spring integration
implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'
// dgs
implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'
更新 1:
就其价值而言,如果我将订阅更新为以下内容,我会在客户端获得结果。
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
// repository is a ReactiveMongoRepository
return repository.findAll();
}
更新 2:
这是最终确定的 bean,以防它根据公认的解决方案帮助某人。我需要一个话题的听众,而不是一个队列。
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(), surveyDestination).pubSubDomain(true))
.jmsMessageConverter(converter))
.toReactivePublisher();
}
你的问题在这里:
return Flux.from(
IntegrationFlows.from(
而且框架只是看不到要正确解析和注册的内部 IntegrationFlow
实例。
要使其正常工作,您需要考虑将 IntegrationFlow
声明为顶级 bean。
像这样:
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.transform([transform to SurveyResult])
.toReactivePublisher();
}
现在框架知道这个逻辑IntegrationFlow
容器必须被解析并且所有的bean都必须被注册和启动。
如果您不能为您的转换器提供 Jms.messageDrivenChannelAdapter
,您可能需要重新考虑您的 SurveyResultMessageConverter
逻辑到一个普通的 transform()
。
那么在你的SurveyResultsSubscriptionDataFetcher
中你只需要:
return surveyResultsReactiveSource.map(Message::getPayload);
我的技术栈有点复杂。我正在利用 Netflix DGS 提供 GraphQL 服务。在幕后是一堆从各种服务发送和接收数据的 JMS 组件。我在 GraphQL 订阅之外的一切都在工作。
具体来说,我想做的是为来自 ActiveMQ 主题的消息创建一个 GraphQL 订阅。
所以我有一个 SubscriptionDataFetcher 如下:
@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {
private final Publisher<SurveyResult> surveyResultsReactiveSource;
@Autowired
public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
this.surveyResultsReactiveSource = surveyResultsReactiveSource;
}
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
return surveyResultsReactiveSource;
}
}
在我的 Spring 配置中,我使用以下 Spring 集成流程:
@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return Flux.from(
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher())
.map((message) -> converter.fromMessage(message, SurveyResult.class));
}
我说几句:
- 我有一个单独的
@JmsListener
正在接收这些与主题无关的消息 - 我没有看到不止一个消费者,即使在建立网络套接字连接后也是如此。
- 如果我将 Mongo 响应式 Spring 数据存储库连接到此 GraphQL 订阅,客户端会收到数据。
当我将客户端连接到订阅时,我看到以下日志:
PublishSubscribeChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler : Subscription started for 1
我怀疑在建立网络套接字连接时消息监听器容器没有被激活。我应该“激活”通道适配器吗?我错过了什么?
技术堆栈:
// spring boots - version 2.4.3
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-activemq"
implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
implementation "org.springframework.boot:spring-boot-starter-integration"
implementation 'org.springframework.boot:spring-boot-starter-security'
// spring integration
implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'
// dgs
implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'
更新 1:
就其价值而言,如果我将订阅更新为以下内容,我会在客户端获得结果。
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
// repository is a ReactiveMongoRepository
return repository.findAll();
}
更新 2:
这是最终确定的 bean,以防它根据公认的解决方案帮助某人。我需要一个话题的听众,而不是一个队列。
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(), surveyDestination).pubSubDomain(true))
.jmsMessageConverter(converter))
.toReactivePublisher();
}
你的问题在这里:
return Flux.from(
IntegrationFlows.from(
而且框架只是看不到要正确解析和注册的内部 IntegrationFlow
实例。
要使其正常工作,您需要考虑将 IntegrationFlow
声明为顶级 bean。
像这样:
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.transform([transform to SurveyResult])
.toReactivePublisher();
}
现在框架知道这个逻辑IntegrationFlow
容器必须被解析并且所有的bean都必须被注册和启动。
如果您不能为您的转换器提供 Jms.messageDrivenChannelAdapter
,您可能需要重新考虑您的 SurveyResultMessageConverter
逻辑到一个普通的 transform()
。
那么在你的SurveyResultsSubscriptionDataFetcher
中你只需要:
return surveyResultsReactiveSource.map(Message::getPayload);