Spring 集成中无法将订户附加到 BroadCastingDispatcher

Unable to attach subscriber to BroadCastingDispatcher in Spring Integration

按照 Bootiful GCP: Integration with Google Cloud Pub/Sub (4/8) 上的这个例子,我试图构建一个流程,我从 Google Pubsub 订阅中读取并写入另一个主题。

DEBUG 模式下启动我的应用程序后,我可以看到消息是从 Google PubSub 到达的,但它们没有收到 "consumed" 因为这个

o.s.i.dispatcher.BroadcastingDispatcher : No subscribers, default behavior is ignore

非常感谢任何帮助。

以下是我的主要代码的样子-

public class PubsubRouteBuilderService {

    private final PubSubTemplate pubSubTemplate; // injected via Spring

    public PubsubRouteBuilderService(PubSubTemplate pubSubTemplate) {
        this.pubSubTemplate = pubSubTemplate;
    }

    public synchronized boolean buildRoute(PubsubRouteModel pubsubRouteModel) {
        log.info("Building route for: {}", pubsubRouteModel);
        buildPubsubRoute(pubsubRouteModel);
        // some unrelated logic
        return true;
    }

    private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {

        final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
            RouteBuilderFactory
                    .messageChannelAdapter(
                            RouteBuilderFactory.getMessageChannel(),
                            pubSubTemplate,
                            pubsubRouteModel.getFromSub()))
            .handle(
                    message -> {
                        log.info("consumed new message: [" + message.getPayload() + "]");
                        AckReplyConsumer consumer = message.getHeaders()
                                .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
                        consumer.ack();
                    })
            .get();

        standardIntegrationFlow.start();
    }
}

以下是RouteBuilderFactory中的其他方法-

public static MessageChannel getMessageChannel() {
    return MessageChannels.publishSubscribe().get();
}

public static PubSubInboundChannelAdapter messageChannelAdapter(MessageChannel inputChannel, PubSubTemplate pubSubTemplate, String channelName) {
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, channelName);
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
}

您的代码似乎根本不是基于该博客 post...

private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {

    final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
        RouteBuilderFactory
                .messageChannelAdapter(
                        RouteBuilderFactory.getMessageChannel(),
                        pubSubTemplate,
                        pubsubRouteModel.getFromSub()))
        .handle(
                message -> {
                    log.info("consumed new message: [" + message.getPayload() + "]");
                    AckReplyConsumer consumer = message.getHeaders()
                            .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
                    consumer.ack();
                })
        .get();

    standardIntegrationFlow.start();
}

您不能只 "start" 一些任意 IntegrationFlow 对象 - 它必须由 Spring 管理(声明为 @Bean)。

框架在幕后构建了一些基础设施来使所有这些工作正常进行。