如何防止 ListenerExecutionFailedException: Listener throw exception

How do I prevent ListenerExecutionFailedException: Listener threw exception

我需要做什么来防止可能由 RabbitMQ 抛出的以下异常。

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:877)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:787)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:707)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=10=]1(SimpleMessageListenerContainer.java:98)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1236)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:688)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1190)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1174)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:98)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1363)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'amqpLaunchSpringBatchJobFlow.channel#0'; nested exception is jp.ixam_drive.batch.service.JobExecutionRuntimeException: Failed to start job with name ads-insights-import and parameters {accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32}
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access0(AmqpInboundChannelAdapter.java:45)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.onMessage(AmqpInboundChannelAdapter.java:95)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:784)
    ... 10 common frames omitted
    Caused by: jp.ixam_drive.batch.service.JobExecutionRuntimeException: Failed to start job with name ads-insights-import and parameters {accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32}
    at jp.ixam_drive.facebook.SpringBatchLauncher.launchJob(SpringBatchLauncher.java:42)
    at jp.ixam_drive.facebook.AmqpBatchLaunchIntegrationFlows.lambda$amqpLaunchSpringBatchJobFlow(AmqpBatchLaunchIntegrationFlows.java:71)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    ... 18 common frames omitted
    Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32}.  If you want to run this job again, change the parameters.
    at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126)
    at sun.reflect.GeneratedMethodAccessor193.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:99)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean.invoke(AbstractJobRepositoryFactoryBean.java:172)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy125.createJobExecution(Unknown Source)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125)
    at jp.ixam_drive.batch.service.JobOperationsService.launch(JobOperationsService.java:64)
    at jp.ixam_drive.facebook.SpringBatchLauncher.launchJob(SpringBatchLauncher.java:37)
    ... 24 common frames omitted

当我有 2 个 Spring 启动应用程序实例时,运行 以下代码并行执行 Spring 批处理作业?

@Configuration
@Conditional(AmqpBatchLaunchCondition.class)
@Slf4j
public class AmqpAsyncAdsInsightsConfiguration {

    @Autowired
    ObjectMapper objectMapper;

    @Value("${batch.launch.amqp.routing-keys.async-insights}")
    String routingKey;

    @Bean
    public IntegrationFlow amqpOutboundAsyncAdsInsights(AmqpTemplate amqpTemplate) {
        return IntegrationFlows.from("async_ads_insights")
                .<JobParameters, byte[]>transform(SerializationUtils::serialize)
                .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(routingKey)).get();
    }

    @Bean
    public IntegrationFlow amqpAdsInsightsAsyncJobRequestFlow(FacebookMarketingServiceProvider serviceProvider,
            JobParametersToApiParametersTransformer transformer, ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, routingKey))
                .<byte[], JobParameters>transform(SerializationUtils::deserialize)
                .<JobParameters, ApiParameters>transform(transformer)
                .<ApiParameters>handle((payload, header) -> {
                    String accessToken = (String) header.get("accessToken");
                    String id = (String) header.get("object_id");
                    FacebookMarketingApi api = serviceProvider.getApi(accessToken);
                    String reportRunId = api.asyncRequestOperations().getReportRunId(id, payload.toMap());
                    ObjectNode objectNode = objectMapper.createObjectNode();
                    objectNode.put("accessToken", accessToken);
                    objectNode.put("id", id);
                    objectNode.put("report_run_id", reportRunId);
                    objectNode.put("classifier", (String) header.get("classifier"));
                    objectNode.put("job_request_id", (Long) header.get("job_request_id"));
                    return serialize(objectNode);
                }).channel("ad_report_run_polling_channel").get();
    }

    @SneakyThrows
    private String serialize(JsonNode jsonNode) {
        return objectMapper.writeValueAsString(jsonNode);
    }
}

@Configuration
@Conditional(AmqpBatchLaunchCondition.class)
@Slf4j
public class AmqpBatchLaunchIntegrationFlows {

    @Autowired
    SpringBatchLauncher batchLauncher;

    @Value("${batch.launch.amqp.routing-keys.job-launch}")
    String routingKey;

    @Bean(name = "batch_launch_channel")
    public MessageChannel batchLaunchChannel() {
        return MessageChannels.executor(Executors.newSingleThreadExecutor()).get();
    }

    @Bean
    public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
            @Qualifier("batch_launch_channel") MessageChannel batchLaunchChannel) {
        return IntegrationFlows.from(batchLaunchChannel)
                .<JobParameters, byte[]>transform(SerializationUtils::serialize)
                .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(routingKey)).get();
    }

    @Bean
    public IntegrationFlow amqpLaunchSpringBatchJobFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, routingKey))
                .handle(message -> {
                    String jobName = (String) message.getHeaders().get("job_name");
                    byte[] bytes = (byte[]) message.getPayload();
                    JobParameters jobParameters = SerializationUtils.deserialize(bytes);
                    batchLauncher.launchJob(jobName, jobParameters);
                }).get();
    }
}

@Configuration
@Slf4j
public class AsyncAdsInsightsConfiguration {

    @Value("${batch.core.pool.size}")
    public Integer batchCorePoolSize;

    @Value("${ixam_drive.facebook.api.ads-insights.async-poll-interval}")
    public String asyncPollInterval;

    @Autowired
    ObjectMapper objectMapper;

    @Autowired
    private DataSource dataSource;

    @Bean(name = "async_ads_insights")
    public MessageChannel adsInsightsAsyncJobRequestChannel() {
        return MessageChannels.direct().get();
    }

    @Bean(name = "ad_report_run_polling_channel")
    public MessageChannel adReportRunPollingChannel() {
        return MessageChannels.executor(Executors.newFixedThreadPool(batchCorePoolSize)).get();
    }

    @Bean
    public IntegrationFlow adReportRunPollingLoopFlow(FacebookMarketingServiceProvider serviceProvider) {
        return IntegrationFlows.from(adReportRunPollingChannel())
                .<String>handle((payload, header) -> {
                    ObjectNode jsonNode = deserialize(payload);
                    String accessToken = jsonNode.get("accessToken").asText();
                    String reportRunId = jsonNode.get("report_run_id").asText();
                    try {
                        AdReportRun adReportRun = serviceProvider.getApi(accessToken)
                                .fetchObject(reportRunId, AdReportRun.class);
                        log.debug("ad_report_run: {}", adReportRun);
                        return jsonNode.set("ad_report_run", objectMapper.valueToTree(adReportRun));
                    } catch (Exception e) {
                        log.error("failed while polling for ad_report_run.id: {}", reportRunId);
                        throw new RuntimeException(e);
                    }
                }).<JsonNode, Boolean>route(payload -> {
                    JsonNode adReportRun = payload.get("ad_report_run");
                    return adReportRun.get("async_percent_completion").asInt() == 100 &&
                            "Job Completed".equals(adReportRun.get("async_status").asText());
                }, rs -> rs.subFlowMapping(true,
                        f -> f.transform(JsonNode.class,
                                source -> {
                                    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
                                    jobParametersBuilder
                                            .addString("accessToken", source.get("accessToken").asText());
                                    jobParametersBuilder.addString("id", source.get("id").asText());
                                    jobParametersBuilder
                                            .addString("classifier", source.get("classifier").asText());
                                    jobParametersBuilder
                                            .addLong("report_run_id", source.get("report_run_id").asLong());
                                    jobParametersBuilder
                                            .addLong("job_request_id", source.get("job_request_id").asLong());
                                    return jobParametersBuilder.toJobParameters();
                                }).channel("batch_launch_channel"))
                        .subFlowMapping(false,
                                f -> f.transform(JsonNode.class, this::serialize)
                                        .<String>delay("delay", asyncPollInterval, c -> c.transactional()
                                                .messageStore(jdbcMessageStore()))
                                        .channel(adReportRunPollingChannel()))).get();
    }

    @SneakyThrows
    private String serialize(JsonNode jsonNode) {
        return objectMapper.writeValueAsString(jsonNode);
    }

    @SneakyThrows
    private ObjectNode deserialize(String payload) {
        return objectMapper.readerFor(ObjectNode.class).readValue(payload);
    }

    @Bean
    public JdbcMessageStore jdbcMessageStore() {
        JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
        return jdbcMessageStore;
    }

    @Bean
    public JobParametersToApiParametersTransformer jobParametersToApiParametersTransformer() {
        return new JobParametersToApiParametersTransformer() {
            @Override
            protected ApiParameters transform(JobParameters jobParameters) {
                ApiParameters.ApiParametersBuilder builder = ApiParameters.builder();
                MultiValueMap<String, String> multiValueMap = new LinkedMultiValueMap<>();
                String level = jobParameters.getString("level");
                if (!StringUtils.isEmpty(level)) {
                    multiValueMap.set("level", level);
                }
                String fields = jobParameters.getString("fields");
                if (!StringUtils.isEmpty(fields)) {
                    multiValueMap.set("fields", fields);
                }
                String filter = jobParameters.getString("filter");
                if (filter != null) {
                    try {
                        JsonNode jsonNode = objectMapper.readTree(filter);
                        if (jsonNode != null && jsonNode.isArray()) {
                            List<ApiFilteringParameters> filteringParametersList = new ArrayList<>();
                            List<ApiSingleValueFilteringParameters> singleValueFilteringParameters = new ArrayList<>();
                            ArrayNode arrayNode = (ArrayNode) jsonNode;
                            arrayNode.forEach(node -> {
                                String field = node.get("field").asText();
                                String operator = node.get("operator").asText();
                                if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(operator)) {
                                    String values = node.get("values").asText();
                                    String[] valuesArray = !StringUtils.isEmpty(values) ? values.split(",") : null;
                                    if (valuesArray != null) {
                                        if (valuesArray.length > 1) {
                                            filteringParametersList.add(ApiFilteringParameters
                                                    .of(field, Operator.valueOf(operator), valuesArray));
                                        } else {
                                            singleValueFilteringParameters.add(ApiSingleValueFilteringParameters
                                                    .of(field, Operator.valueOf(operator), valuesArray[0]));
                                        }
                                    }
                                }
                            });
                            if (!filteringParametersList.isEmpty()) {
                                builder.filterings(filteringParametersList);
                            }
                            if (!singleValueFilteringParameters.isEmpty()) {
                                builder.filterings(singleValueFilteringParameters);
                            }
                        }

                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
                String start = jobParameters.getString("time_ranges.start");
                String end = jobParameters.getString("time_ranges.end");
                String since = jobParameters.getString("time_range.since");
                String until = jobParameters.getString("time_range.until");

                if (!StringUtils.isEmpty(start) && !StringUtils.isEmpty(end)) {
                    builder.timeRanges(ApiParameters.timeRanges(start, end));
                } else if (!StringUtils.isEmpty(since) && !StringUtils.isEmpty(until)) {
                    builder.timeRange(new TimeRange(since, until));
                }
                String actionBreakdowns = jobParameters.getString("action_breakdowns");
                if (!StringUtils.isEmpty(actionBreakdowns)) {
                    multiValueMap.set("action_breakdowns", actionBreakdowns);
                }
                String attributionWindows = jobParameters.getString("action_attribution_windows");
                if (attributionWindows != null) {
                    try {
                        multiValueMap
                                .set("action_attribution_windows",
                                        objectMapper.writeValueAsString(attributionWindows.split(",")));
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                }
                builder.multiValueMap(multiValueMap);
                String pageSize = jobParameters.getString("pageSize");
                if (!StringUtils.isEmpty(pageSize)) {
                    builder.limit(pageSize);
                }
                return builder.build();
            }
        };
    }
}

消息流向如下:

   1. channel[async_ads_insights] ->IntegrationFlow[amqpOutboundAsyncAdsInsights]->[AMQP]->IntegrationFlow[amqpAdsInsightsAsyncJobRequestFlow]->channel[ad_report_run_polling_channel]->IntegrationFlow[adReportRunPollingLoopFlow]-IF END LOOP->channel[batch_launch_channel] ELSE -> channel[ad_report_run_polling_channel]

   2. channel[batch_launch_channel] -> IntegrationFlow[amqpOutbound]-> IntegrationFlow[amqpLaunchSpringBatchJobFlow]

   3. Spring Batch Job is launched.

异常不是在两个实例启动后立即抛出,而是在一段时间后抛出。启动 Spring 批处理作业确实成功,但随后开始失败 "A job instance already exists and is complete for..."

该作业用于检索 Facebook 广告结果。

非常感谢您深入了解导致上述错误的原因。

我也有这个配置,它不使用 AMQP 并且工作没有任何问题,但它只适用于一个实例。

@Configuration
@Conditional(SimpleBatchLaunchCondition.class)
@Slf4j
public class SimpleBatchLaunchIntegrationFlows {

    @Autowired
    SpringBatchLauncher batchLauncher;

    @Autowired
    DataSource dataSource;

    @Bean(name = "batch_launch_channel")
    public MessageChannel batchLaunchChannel() {
        return MessageChannels.queue(jdbcChannelMessageStore(), "batch_launch_channel").get();
    }

    @Bean
    public ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider() {
        return new MySqlChannelMessageStoreQueryProvider();
    }

    @Bean
    public JdbcChannelMessageStore jdbcChannelMessageStore() {
        JdbcChannelMessageStore channelMessageStore = new JdbcChannelMessageStore(dataSource);
        channelMessageStore.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider());
        channelMessageStore.setUsingIdCache(true);
        channelMessageStore.setPriorityEnabled(true);
        return channelMessageStore;
    }

    @Bean
    public IntegrationFlow launchSpringBatchJobFlow(@Qualifier("batch_launch_channel")
            MessageChannel batchLaunchChannel) {
        return IntegrationFlows.from(batchLaunchChannel)
                .handle(message -> {
                    String jobName = (String) message.getHeaders().get("job_name");
                    JobParameters jobParameters = (JobParameters) message.getPayload();
                    batchLauncher.launchJob(jobName, jobParameters);
                }, e->e.poller(Pollers.fixedRate(500).receiveTimeout(500))).get();
    }
}

请参阅 Spring 批处理文档。启动作业的新实例时,作业参数必须是唯一的。

一个常见的解决方案是添加一个带有 UUID 或类似的虚拟参数,但批处理提供了一种策略,例如每次增加一个数字参数。

编辑

存在某些 class 例外情况,其中的成员被认为是不可恢复的(致命的)并且尝试重新传送是没有意义的。

示例包括 MessageConversionException - 如果我们不能在第一次转换它,我们可能无法在重新交付时转换。 ConditionalRejectingErrorHandler 是我们检测此类异常并导致它们被永久拒绝(而不是重新交付)的机制。

其他异常导致邮件默认重新投递 - 还有一个 属性 defaultRequeuRejected 可以设置为 false 以永久拒绝所有失败(不推荐)。

您可以通过子 class 其 DefaultExceptionStrategy 自定义错误处理程序 - 覆盖 isUserCauseFatal(Throwable cause) 以扫描 cause 树以查找 JobInstanceAlreadyCompleteException return 真 (cause.getCause().getCause() instanceof ...)

I think it was triggered by the error thrown by the "SpringBatch job running already" exception.

这仍然表明您以某种方式收到了具有相同参数的第二条消息;这是一个不同的错误,因为原来的工作仍然是 运行;该消息被拒绝(并重新排队),但在随后的交付中,您会得到已经完成的异常。

所以,我仍然说你的问题的根本原因是重复请求,但你可以在通道适配器的侦听器容器中使用自定义错误处理程序来避免这种行为。

我建议您记录重复的消息,以便找出收到它们的原因。