覆盖的 RabbitSourceConfiguration(应用启动器)不适用于 Spring Cloud Edgware

Overridden RabbitSourceConfiguration (app starters) does not work with Spring Cloud Edgware

我正在测试 Spring Cloud DataFlow 服务从 Spring Cloud Dalston 的升级。SR4/Spring Boot 1.5.9 到 Spring Cloud Edgware/Spring 启动 1.5.9。我的一些服务从应用程序启动器扩展源(或接收器)组件。我发现这不适用于 Spring Cloud Edgware。

例如,我覆盖了 org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration 并将我的应用程序绑定到我覆盖的版本。这以前曾与 Spring 云版本一起使用将近一年。

使用 Edgware,我得到以下信息(无论该应用程序是 运行 独立的还是在数据流中):

***************************
APPLICATION FAILED TO START
***************************

Description:

Field channels in org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration required a bean of type 'org.springframework.cloud.stream.messaging.Source' that could not be found.


Action:

Consider defining a bean of type 'org.springframework.cloud.stream.messaging.Source' in your configuration.

我得到与 spring-cloud-starter-stream-rabbit.

的 1.3.0.RELEASE 和 1.2.0.RELEASE 相同的行为

我重写了 RabbitSourceConfiguration,因此我可以在 AmqpInboundChannelAdapter 上设置一个 header 映射器,并在启动容器之前执行连接测试。

我的子类使用 @EnableBinding(HeaderMapperRabbitSourceConfiguration.class) 绑定到 Spring 引导应用程序。我的子类的简化版本是:

public class HeaderMapperRabbitSourceConfiguration extends RabbitSourceConfiguration {

    public HeaderMapperRabbitSourceConfiguration(final MyHealthCheck healthCheck,
                                                 final MyAppConfig config) {
        // ...
    }

    @Bean
    @Override
    public AmqpInboundChannelAdapter adapter() {
        final AmqpInboundChannelAdapter adapter = super.adapter();
        adapter.setHeaderMapper(new NotificationHeaderMapper(config));

        return adapter;
    }

    @Bean
    @Override
    public SimpleMessageListenerContainer container() {
        if (config.performConnectivityCheckOnStartup()) {

            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Attempting connectivity with ...");
            }
            final Health health = healthCheck.health();
            if (health.getStatus() == Status.DOWN) {
                LOGGER.error("Unable to connect .....");
                throw new UnableToLoginException("Unable to connect ...");
            } else if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Connectivity established with ...");
            }
        }

        return super.container();
    }
}

你真的不应该在 @Bean 定义中做类似 healthCheck.health(); 的事情。应用程序上下文尚未完全烘焙或启动;它可能工作,也可能不工作,具体取决于创建 bean 的顺序。

如果您想阻止应用程序启动,请添加一个实现 SmartLifecycle 的 bean,将 bean 置于后期阶段(高值),以便它在其他一切之后启动。然后将您的代码放入start()autStartup 一定是真的。

在这种情况下,它是 运行 在流基础结构创建频道之前。

某些顺序可能与早期版本有所不同,但无论如何,在 @Bean 定义中执行这样的 activity 是危险的。

只是你之前运气好而已。

编辑

我刚刚注意到你的 @EnableBinding 是错误的;应该是 Source.class。我看不出它是如何工作的——这就是为类型为 Source.

channels 字段创建 bean 的原因

将流和活页夹更新到 1.3 后,这对我来说很好用。0.RELEASE...

@Configuration
public class MySource extends RabbitSourceConfiguration {

    @Bean
    @Override
    public AmqpInboundChannelAdapter adapter() {
        AmqpInboundChannelAdapter adapter = super.adapter();
        adapter.setHeaderMapper(new MyMapper());
        return adapter;
    }


}

@SpringBootApplication
@EnableBinding(Source.class)
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

如果这不起作用,请编辑问题以显示您的 POM。