如何在我的 RemoteChunkingManagerStepBuilderFactory 上使用功能样式?

How to use the functional style on my RemoteChunkingManagerStepBuilderFactory?

我正在实施 Spring 批处理集成 RemoteChunking。 https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking

我遇到了 @Input 的弃用,文档说我们必须使用函数式风格。

如何在我的 Spring 批处理集成流程中使用 Consumer(在 spring 云流中使用) ?

package pt.bayonne.sensei.RemoteChunking.manager;

import org.aspectj.bridge.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.function.Consumer;
import java.util.function.Supplier;

@Profile("!worker")
@Configuration
public class FunctionalBinders {


    @Bean
    public Sinks.Many<Object> sink() {
        return Sinks.many()
                .replay()
                .latest();
    }


    @Bean
    public Supplier<Flux<Object>> clientRequests() {
        return () -> sink()
                .asFlux()
                .cache();
    }
    
    @Bean
    public Consumer<Message<?>> onClientReplies(){
        return message -> {
            //do your stuff
        };
    }
}

我的作业配置

public TaskletStep dispatchStep(){
       return this.remoteChunkingManagerStepBuilderFactory.get("dispatch-step")
                .chunk(10)
                .reader(reader())
                .outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess())
                .inputChannel(replies()) //how to use the functional style here?
                .build();

}

我知道它需要一个 PollableChannel 但我的问题是 如何在我的 RemoteChunkingManagerStepBuilderFactory 上使用功能样式?

任何例子将不胜感激。 非常感谢。

.outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess()) .inputChannel(replies()) //how to use the functional style here?

方法 org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel) 接受一个 org.springframework.messaging.MessageChannel,它是一个函数式接口(请参阅 it is annotated with @FunctionalInterface)。因此,您可以使用 lambda 在构建器中定义输出通道。

但是,org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#inputChannel(PollableChannel) 接受 org.springframework.messaging.PollableChannel 不是 功能接口。这就是为什么不能在这里使用 lambda 的原因。