在 Spring 批处理 SkipListener 中获取当前 RabbitMQ 通道

Get current RabbitMQ Channel in Spring Batch SkipListener

我触发了一个 RabbitMQ Spring 批处理应用程序。

我需要 nack 异常消息。但是,Spring Batch 不会在 运行 方法之外抛出异常。因此,我需要 nack SkipListener 中的消息。我已经设法将消息标记作为 JobParameter,但我无法将 Channel 作为 JobParameter[=] 发送39=], 因为它不是 Serializable.

有什么方法可以将 Channel 传递给批处理或任何其他错误处理方法吗?

这是我现在拥有的示例。

MessageConsumer.java

import com.project.common.dto.DataImportStartDto;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

    private final DataImportService dataImportService;

    public MessageConsumer(DataImportService dataImportService) {
        this.dataImportService = dataImportService;
    }

    @RabbitListener(queues = "etl_queue", concurrency = "1", ackMode = "MANUAL")
    public void receiveDataImportMessage(DataImportStartDto dataImportStartDto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        dataImportService.startImport(dataImportStartDto, tag);
        channel.basicAck(tag, false);
    }

数据导入服务.java

import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

public class DataImportServiceImpl implements DataImportService {

private final JobLauncher jobLauncher;
private final Job dataImportJob;

@Service
public DataImportServiceImpl(JobLauncher jobLauncher, @Qualifier("dataImport") Job dataImportJob) {
    this.jobLauncher = jobLauncher;
    this.dataImportJob= dataImportJob;
}

@Override
public void startImport(DataImportStartDto dto, long tag) {
    jobLauncher.run(dataImportJob, buildJobParameters(dto, tag));
    }
}

private JobParameters buildJobParameters(DataImportStartDto dto, long tag) {
    return new JobParametersBuilder()
        .addString("unique", String.valueOf(UUID.randomUUID())) // needed if there are jobs with the same parameters running in parallel
        .addLong("tag", tag)
        .toJobParameters();
}

BatchExceptionHandler.java

import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@StepScope
@Component
public class BatchExceptionHandler implements SkipListener<IntervalDataWrapper<String>, IntervalDataWrapper<String>> {

    @Value("#{jobParameters['tag']}")
    private long tag; // this value is here on onSkipInRead execution


    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println(t);
        // channel.basicNack(tag, false, true); // this is what I need to do here
    }

    @Override
    public void onSkipInWrite(IntervalDataWrapper<String> item, Throwable t) {
        System.out.println(t);
    }

    @Override
    public void onSkipInProcess(IntervalDataWrapper<String> item, Throwable t) {
        System.out.println(t);
    }
}

IntervalDataWrapper 是我的带有批处理配置的自定义数据包装器。

我认为您不应该在跳过侦听器中这样做 nack。这些调用不在 IMO 的同一抽象级别。我从您的实施中了解到,您希望在读取错误时 nack 。您现在的当前设置是:

@RabbitListener(queues = "etl_queue", concurrency = "1", ackMode = "MANUAL")
public void receiveDataImportMessage(DataImportStartDto dataImportStartDto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

   // run a job
   dataImportService.startImport(dataImportStartDto, tag); // send nack = true from within the job

   // when the job is finished, send nack = false
   channel.basicAck(tag, false);
}

如您所见,一份确认是从作业内部发送的,另一份是作业外部发送的。我会做的是,如果有一个可跳过的异常,我会将作业的退出状态设置为 FINISHED_WITH_SKIPS 之类的东西(如果需要,甚至会失败),然后检查作业的状态以了解哪种类型的应将确认发送到频道:

@RabbitListener(queues = "etl_queue", concurrency = "1", ackMode = "MANUAL")
    public void receiveDataImportMessage(DataImportStartDto dataImportStartDto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

   JobExecution jobExecution = dataImportService.startImport(dataImportStartDto, tag);
   if (jobExecution.getExitStatus().getExitCode().equals("FINISHED_WITH_SKIPS") {
      channel.basicNack(tag, false, true);
   } else {
      channel.basicAck(tag, false);
   };
        
}

这种方法不需要在跳过侦听器中注入通道。此外,这种方法使批处理逻辑与消息传递逻辑分离,更易于测试、部署和思考。