spring-集成 amqp 出站适配器竞争条件?
spring-integration amqp outbound adapter race condition?
我们的一个生产应用程序中有一个相当复杂的 spring-integration-amqp 用例,我们在启动时看到了一些 "org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers" 异常。在启动时出现初始错误后,我们不再从相同的组件中看到这些异常。这似乎是依赖于 AMQP 出站适配器并最终在生命周期早期使用它们的组件的某种启动竞争条件。
我可以通过在 PostConstruct 方法中调用发送到连接到出站适配器的通道的网关来重现这一点。
配置:
package gadams;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageChannel;
@SpringBootApplication
@IntegrationComponentScan
public class RabbitRace {
public static void main(String[] args) {
SpringApplication.run(RabbitRace.class, args);
}
@Bean(name = "HelloOut")
public MessageChannel channelHelloOut() {
return MessageChannels.direct().get();
}
@Bean
public Queue queueHello() {
return new Queue("hello.q");
}
@Bean(name = "helloOutFlow")
public IntegrationFlow flowHelloOutToRabbit(RabbitTemplate rabbitTemplate) {
return IntegrationFlows.from("HelloOut").handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("hello.q"))
.get();
}
}
网关:
package gadams;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway
public interface HelloGateway {
@Gateway(requestChannel = "HelloOut")
void sendMessage(String message);
}
组件:
package gadams;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
@Component
@DependsOn("helloOutFlow")
public class HelloPublisher {
@Autowired
private HelloGateway helloGateway;
@PostConstruct
public void postConstruct() {
helloGateway.sendMessage("hello");
}
}
在我的生产用例中,我们有一个带有 PostConstruct 方法的组件,我们在其中使用 TaskScheduler 来安排一堆组件,其中一些组件依赖于 AMQP 出站适配器,其中一些最终立即执行。我尝试将 bean 名称放在涉及出站适配器的 IntegrationFlows 上,并在使用网关 and/or 网关本身的 bean 上使用 @DependsOn,但这并没有消除启动时的错误。
那一切都叫Lifecycle
。任何 Spring 集成端点仅在执行 start()
时才开始侦听或生成消息。
通常对于标准默认 autoStartup = true
它在 ApplicationContext.finishRefresh();
中作为
完成
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
从@PostConstruct
(afterPropertiesSet()
)开始向频道发送消息真的很早,因为它离finishRefresh()
.
很远
您真的应该重新考虑您的生产逻辑和该实施进入 SmartLifecycle.start()
阶段。
在 Reference Manual 中查看更多信息。
我们的一个生产应用程序中有一个相当复杂的 spring-integration-amqp 用例,我们在启动时看到了一些 "org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers" 异常。在启动时出现初始错误后,我们不再从相同的组件中看到这些异常。这似乎是依赖于 AMQP 出站适配器并最终在生命周期早期使用它们的组件的某种启动竞争条件。
我可以通过在 PostConstruct 方法中调用发送到连接到出站适配器的通道的网关来重现这一点。
配置:
package gadams;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageChannel;
@SpringBootApplication
@IntegrationComponentScan
public class RabbitRace {
public static void main(String[] args) {
SpringApplication.run(RabbitRace.class, args);
}
@Bean(name = "HelloOut")
public MessageChannel channelHelloOut() {
return MessageChannels.direct().get();
}
@Bean
public Queue queueHello() {
return new Queue("hello.q");
}
@Bean(name = "helloOutFlow")
public IntegrationFlow flowHelloOutToRabbit(RabbitTemplate rabbitTemplate) {
return IntegrationFlows.from("HelloOut").handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("hello.q"))
.get();
}
}
网关:
package gadams;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway
public interface HelloGateway {
@Gateway(requestChannel = "HelloOut")
void sendMessage(String message);
}
组件:
package gadams;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
@Component
@DependsOn("helloOutFlow")
public class HelloPublisher {
@Autowired
private HelloGateway helloGateway;
@PostConstruct
public void postConstruct() {
helloGateway.sendMessage("hello");
}
}
在我的生产用例中,我们有一个带有 PostConstruct 方法的组件,我们在其中使用 TaskScheduler 来安排一堆组件,其中一些组件依赖于 AMQP 出站适配器,其中一些最终立即执行。我尝试将 bean 名称放在涉及出站适配器的 IntegrationFlows 上,并在使用网关 and/or 网关本身的 bean 上使用 @DependsOn,但这并没有消除启动时的错误。
那一切都叫Lifecycle
。任何 Spring 集成端点仅在执行 start()
时才开始侦听或生成消息。
通常对于标准默认 autoStartup = true
它在 ApplicationContext.finishRefresh();
中作为
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
从@PostConstruct
(afterPropertiesSet()
)开始向频道发送消息真的很早,因为它离finishRefresh()
.
您真的应该重新考虑您的生产逻辑和该实施进入 SmartLifecycle.start()
阶段。
在 Reference Manual 中查看更多信息。