如何在 spring 批处理中制作非阻塞项目处理器(不仅与 TaskExecuter 异步)?

How to make a non-blocking item processor in spring batch (Not only asynchronous with a TaskExecuter)?

Spring 批次有一个名为 AsyncItemProcessor 的工具。它只是用 ItemProcessor 包装一个 运行 并用 TaskExecutor 包装它,因此它可以 运行 异步。我想在此 ItemProcessor 中进行休息调用,问题是此 TaskExecutor 中进行休息调用的每个线程都将被阻塞,直到获得响应。我想让它成为非阻塞的,类似于反应式范例。

我有一个 ItemProcessor 调用休息点并得到它的响应:

    @Bean
    public ItemProcessor<String, String> testItemProcessor() {
        return item -> {
            String url = "http://localhost:8787/test";
            try {
                // it's a long time process and take a lot of time
                String response = restTemplate.exchange(new URI(url), HttpMethod.GET, new RequestEntity(HttpMethod.GET, new URI(url)), String.class).getBody();
                return response;
            } catch (URISyntaxException e) {
                e.printStackTrace();
                return null;
            }
        };
    }

现在我用AsyncItemProcessor包裹它:

    @Bean
    public AsyncItemProcessor testAsyncItemProcessor() throws Exception {
        AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(testItemProcessor());
        asyncItemProcessor.setTaskExecutor(testThreadPoolTaskExecutor());
        asyncItemProcessor.afterPropertiesSet();
        return asyncItemProcessor;
    }

    @Bean
    public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(50);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return threadPoolTaskExecutor;
    }

我用了一个ThreadPoolTaskExecutor作为TaskExecuter

这是ItemWriter:

    @Bean
    public ItemWriter<String> testItemWriter() {
        return items -> {
            // I write them to a file and a database, but for simplicity:
            for (String item : items) {
                System.out.println(item);
            }
        };
    }

    @Bean
    public AsyncItemWriter asyncTestItemWriter() throws Exception {
        AsyncItemWriter asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(testItemWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

步骤和作业配置:

    @Bean
    public Step testStep() throws Exception {
        return stepBuilderFactory.get("testStep")
                .<String, String>chunk(1000)
                .reader(testItemReader())
                .processor(testAsyncItemProcessor())
                .writer(asyncTestItemWriter())
                .build();
    }


    @Bean
    public Job testJob() throws Exception {
        return jobBuilderFactory.get("testJob")
                .start(testStep())
                .build();
    }

ItemReader 是一个简单的 ListItemReader:

    @Bean
    public ItemReader<String> testItemReader() {
        List<String> integerList = new ArrayList<>();
        for (int i=0; i<10000; i++) {
            integerList.add(String.valueOf(i));
        }
        return new ListItemReader(integerList);
    }

现在我有一个有 50~100 个线程的 ThreadPoolTaskExecutorItemProcessor 中的每个线程进行 rest 调用并 waits/blocks 接收来自服务器的响应。有没有办法让这些 calls/process 非阻塞?如果是,我应该如何设计ItemWriter?在 ItemWriter 中,我想将 ItemProcessor 的结果写入文件和数据库。 每个块的大小为 1000,我可以等到其中的所有记录都得到处理,但我不想在块内的每个 rest 调用中阻塞一个线程。有什么办法可以做到吗?

我知道 Spring rest 模板是使进程阻塞的模板,应该使用 webclient,但是 spring 批处理中是否有任何等效组件(而不是AsyncItemProcessor/AsyncItemWriter) 的 Web 客户端?

不,Spring Batch 中不支持响应式编程,这里有一个开放的功能请求:https://github.com/spring-projects/spring-batch/issues/1008

请注意,响应式意味着整个堆栈都应该是响应式的,从批处理工件(reader、处理器、编写器、侦听器等)到基础设施 bean(作业存储库、事务管理器等),不仅是您的项目处理器和作家。

此外,当前的块处理模型实际上与反应式范式不兼容。原因是 ChunkOrientedTasklet 基本上使用了两个合作者:

  • A ChunkProvider 提供项目块(将项目读取委托给 ItemReader
  • A ChunkProcessor 处理块(将处理和写入分别委托给 ItemProcessor/ItemWriter

这里是代码的简化版本:

Chunk inputs = chunkProvider.provide();
chunkProcessor.process(inputs);

如您所见,该步骤将等待 chunkProcessor(处理器 + 写入器)处理完整个块,然后再读取下一个块。因此,在您的情况下,即使您在处理器 + 编写器中使用非阻塞 API,您的步骤也会在读取下一个块之前等待块被完全处理(除了等待与作业存储库和事务管理器的阻塞交互) .