Spring 集成中无法将订户附加到 BroadCastingDispatcher
Unable to attach subscriber to BroadCastingDispatcher in Spring Integration
按照 Bootiful GCP: Integration with Google Cloud Pub/Sub (4/8) 上的这个例子,我试图构建一个流程,我从 Google Pubsub 订阅中读取并写入另一个主题。
在 DEBUG
模式下启动我的应用程序后,我可以看到消息是从 Google PubSub
到达的,但它们没有收到 "consumed" 因为这个
o.s.i.dispatcher.BroadcastingDispatcher : No subscribers, default behavior is ignore
非常感谢任何帮助。
以下是我的主要代码的样子-
public class PubsubRouteBuilderService {
private final PubSubTemplate pubSubTemplate; // injected via Spring
public PubsubRouteBuilderService(PubSubTemplate pubSubTemplate) {
this.pubSubTemplate = pubSubTemplate;
}
public synchronized boolean buildRoute(PubsubRouteModel pubsubRouteModel) {
log.info("Building route for: {}", pubsubRouteModel);
buildPubsubRoute(pubsubRouteModel);
// some unrelated logic
return true;
}
private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {
final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
RouteBuilderFactory
.messageChannelAdapter(
RouteBuilderFactory.getMessageChannel(),
pubSubTemplate,
pubsubRouteModel.getFromSub()))
.handle(
message -> {
log.info("consumed new message: [" + message.getPayload() + "]");
AckReplyConsumer consumer = message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
consumer.ack();
})
.get();
standardIntegrationFlow.start();
}
}
以下是RouteBuilderFactory
中的其他方法-
public static MessageChannel getMessageChannel() {
return MessageChannels.publishSubscribe().get();
}
public static PubSubInboundChannelAdapter messageChannelAdapter(MessageChannel inputChannel, PubSubTemplate pubSubTemplate, String channelName) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, channelName);
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
您的代码似乎根本不是基于该博客 post...
private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {
final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
RouteBuilderFactory
.messageChannelAdapter(
RouteBuilderFactory.getMessageChannel(),
pubSubTemplate,
pubsubRouteModel.getFromSub()))
.handle(
message -> {
log.info("consumed new message: [" + message.getPayload() + "]");
AckReplyConsumer consumer = message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
consumer.ack();
})
.get();
standardIntegrationFlow.start();
}
您不能只 "start" 一些任意 IntegrationFlow
对象 - 它必须由 Spring 管理(声明为 @Bean
)。
框架在幕后构建了一些基础设施来使所有这些工作正常进行。
按照 Bootiful GCP: Integration with Google Cloud Pub/Sub (4/8) 上的这个例子,我试图构建一个流程,我从 Google Pubsub 订阅中读取并写入另一个主题。
在 DEBUG
模式下启动我的应用程序后,我可以看到消息是从 Google PubSub
到达的,但它们没有收到 "consumed" 因为这个
o.s.i.dispatcher.BroadcastingDispatcher : No subscribers, default behavior is ignore
非常感谢任何帮助。
以下是我的主要代码的样子-
public class PubsubRouteBuilderService {
private final PubSubTemplate pubSubTemplate; // injected via Spring
public PubsubRouteBuilderService(PubSubTemplate pubSubTemplate) {
this.pubSubTemplate = pubSubTemplate;
}
public synchronized boolean buildRoute(PubsubRouteModel pubsubRouteModel) {
log.info("Building route for: {}", pubsubRouteModel);
buildPubsubRoute(pubsubRouteModel);
// some unrelated logic
return true;
}
private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {
final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
RouteBuilderFactory
.messageChannelAdapter(
RouteBuilderFactory.getMessageChannel(),
pubSubTemplate,
pubsubRouteModel.getFromSub()))
.handle(
message -> {
log.info("consumed new message: [" + message.getPayload() + "]");
AckReplyConsumer consumer = message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
consumer.ack();
})
.get();
standardIntegrationFlow.start();
}
}
以下是RouteBuilderFactory
中的其他方法-
public static MessageChannel getMessageChannel() {
return MessageChannels.publishSubscribe().get();
}
public static PubSubInboundChannelAdapter messageChannelAdapter(MessageChannel inputChannel, PubSubTemplate pubSubTemplate, String channelName) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, channelName);
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
您的代码似乎根本不是基于该博客 post...
private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {
final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
RouteBuilderFactory
.messageChannelAdapter(
RouteBuilderFactory.getMessageChannel(),
pubSubTemplate,
pubsubRouteModel.getFromSub()))
.handle(
message -> {
log.info("consumed new message: [" + message.getPayload() + "]");
AckReplyConsumer consumer = message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
consumer.ack();
})
.get();
standardIntegrationFlow.start();
}
您不能只 "start" 一些任意 IntegrationFlow
对象 - 它必须由 Spring 管理(声明为 @Bean
)。
框架在幕后构建了一些基础设施来使所有这些工作正常进行。