Spring Cloud Dataflow errorChannel 不工作
Spring Cloud Dataflow errorChannel not working
我正在尝试为我的 Spring Cloud Dataflow 流创建一个自定义异常处理程序,以路由一些错误以重新排队,而另一些错误则进行 DLQ。
为此,我正在利用全局 Spring 集成 "errorChannel" 和基于异常类型的路由。
这是 Spring 集成错误路由器的代码:
package com.acme.error.router;
import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);
public static final String ERROR_CHANNEL = "errorChannel";
@Router(inputChannel = ERROR_CHANNEL)
public String onError(Message<Object> message) {
LOGGER.debug("ERROR ROUTER - onError");
if(message.getPayload() instanceof MessageTransformationException) {
MessageTransformationException exception = (MessageTransformationException) message.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
if(exceptionChainContainsDlq(exception)) {
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
return ErrorMessageChannels.REQUEUE_CHANNEL;
}
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
...
}
错误路由器由每个流应用程序通过 Spring 启动应用程序上的包扫描为每个应用程序拾取:
@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}
在本地 Spring Cloud Dataflow 服务器(版本 1.5.0-RELEASE)部署并 运行 时,抛出 DlqException,消息成功路由到 onError 方法在 errorRouter 中,然后放入 dlq 主题中。
然而,当它作为 docker 容器部署到 SCDF Kubernetes 服务器(也是 1.5.0-RELEASE 版本)时,永远不会触发 onError 方法。 (router开头的log语句从不输出)
在流应用程序的启动日志中,bean 似乎被正确拾取并注册为 errorChannel 的侦听器,但由于某种原因,当抛出异常时,onError 方法不会处理它们在我们的路由器中。
启动日志:
o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router
我们正在使用 spring 云流和 kafka 活页夹配置的所有默认设置:
spring.cloud:
stream:
binders:
kafka:
type: kafka
environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
environment.spring.cloud.stream.kafka.binder.zkNodes=zklist
编辑:添加了来自 kubectl describe <pod>
的 pod args
Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher
我们尝试的另一个想法是尝试使用 Spring Cloud Stream - spring 集成错误通道支持将错误发送到代理主题,但由于消息似乎不是完全登陆全局 Spring 集成错误通道,这也不起作用。
我们需要在 SCDF Kubernetes 中做些什么来启用全局 Spring Integration errorChannel?
我在这里错过了什么?
根据评论更新解决方案:
After reviewing your configuration I am now pretty sure I know what
the issue is. You have a multi-binder configuration scenario. Even if
you only deal with a single binder instance the existence of
spring.cloud.stream.binders.... is what's going to make framework
treat it as multi-binder. Basically this a bug -
github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can
see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the
latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks
anyway) – Oleg Zhurakousky
这确实是我们设置的问题。我们没有升级,只是暂时取消了 multi-binder 的使用,问题就解决了。
根据评论更新解决方案:
After reviewing your configuration I am now pretty sure I know what
the issue is. You have a multi-binder configuration scenario. Even if
you only deal with a single binder instance the existence of
spring.cloud.stream.binders.... is what's going to make framework
treat it as multi-binder. Basically this a bug -
github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can
see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the
latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks
anyway) – Oleg Zhurakousky
这确实是我们设置的问题。我们没有升级,只是暂时取消了 multi-binder 的使用,问题就解决了。
我正在尝试为我的 Spring Cloud Dataflow 流创建一个自定义异常处理程序,以路由一些错误以重新排队,而另一些错误则进行 DLQ。
为此,我正在利用全局 Spring 集成 "errorChannel" 和基于异常类型的路由。
这是 Spring 集成错误路由器的代码:
package com.acme.error.router;
import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);
public static final String ERROR_CHANNEL = "errorChannel";
@Router(inputChannel = ERROR_CHANNEL)
public String onError(Message<Object> message) {
LOGGER.debug("ERROR ROUTER - onError");
if(message.getPayload() instanceof MessageTransformationException) {
MessageTransformationException exception = (MessageTransformationException) message.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
if(exceptionChainContainsDlq(exception)) {
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
return ErrorMessageChannels.REQUEUE_CHANNEL;
}
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
...
}
错误路由器由每个流应用程序通过 Spring 启动应用程序上的包扫描为每个应用程序拾取:
@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}
在本地 Spring Cloud Dataflow 服务器(版本 1.5.0-RELEASE)部署并 运行 时,抛出 DlqException,消息成功路由到 onError 方法在 errorRouter 中,然后放入 dlq 主题中。
然而,当它作为 docker 容器部署到 SCDF Kubernetes 服务器(也是 1.5.0-RELEASE 版本)时,永远不会触发 onError 方法。 (router开头的log语句从不输出)
在流应用程序的启动日志中,bean 似乎被正确拾取并注册为 errorChannel 的侦听器,但由于某种原因,当抛出异常时,onError 方法不会处理它们在我们的路由器中。
启动日志:
o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router
我们正在使用 spring 云流和 kafka 活页夹配置的所有默认设置:
spring.cloud:
stream:
binders:
kafka:
type: kafka
environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
environment.spring.cloud.stream.kafka.binder.zkNodes=zklist
编辑:添加了来自 kubectl describe <pod>
Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher
我们尝试的另一个想法是尝试使用 Spring Cloud Stream - spring 集成错误通道支持将错误发送到代理主题,但由于消息似乎不是完全登陆全局 Spring 集成错误通道,这也不起作用。
我们需要在 SCDF Kubernetes 中做些什么来启用全局 Spring Integration errorChannel?
我在这里错过了什么?
根据评论更新解决方案:
After reviewing your configuration I am now pretty sure I know what the issue is. You have a multi-binder configuration scenario. Even if you only deal with a single binder instance the existence of spring.cloud.stream.binders.... is what's going to make framework treat it as multi-binder. Basically this a bug - github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks anyway) – Oleg Zhurakousky
这确实是我们设置的问题。我们没有升级,只是暂时取消了 multi-binder 的使用,问题就解决了。
根据评论更新解决方案:
After reviewing your configuration I am now pretty sure I know what the issue is. You have a multi-binder configuration scenario. Even if you only deal with a single binder instance the existence of spring.cloud.stream.binders.... is what's going to make framework treat it as multi-binder. Basically this a bug - github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks anyway) – Oleg Zhurakousky
这确实是我们设置的问题。我们没有升级,只是暂时取消了 multi-binder 的使用,问题就解决了。