如何在我的 RemoteChunkingManagerStepBuilderFactory 上使用功能样式?
How to use the functional style on my RemoteChunkingManagerStepBuilderFactory?
-
spring-batch
-
spring-cloud-stream
-
spring-integration-dsl
-
spring-cloud-function
-
spring-batch-integration
我正在实施 Spring 批处理集成 RemoteChunking。 https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking
我遇到了 @Input 的弃用,文档说我们必须使用函数式风格。
如何在我的 Spring 批处理集成流程中使用 Consumer(在 spring 云流中使用) ?
package pt.bayonne.sensei.RemoteChunking.manager;
import org.aspectj.bridge.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.function.Consumer;
import java.util.function.Supplier;
@Profile("!worker")
@Configuration
public class FunctionalBinders {
@Bean
public Sinks.Many<Object> sink() {
return Sinks.many()
.replay()
.latest();
}
@Bean
public Supplier<Flux<Object>> clientRequests() {
return () -> sink()
.asFlux()
.cache();
}
@Bean
public Consumer<Message<?>> onClientReplies(){
return message -> {
//do your stuff
};
}
}
我的作业配置
public TaskletStep dispatchStep(){
return this.remoteChunkingManagerStepBuilderFactory.get("dispatch-step")
.chunk(10)
.reader(reader())
.outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess())
.inputChannel(replies()) //how to use the functional style here?
.build();
}
我知道它需要一个 PollableChannel 但我的问题是 如何在我的 RemoteChunkingManagerStepBuilderFactory 上使用功能样式?
任何例子将不胜感激。
非常感谢。
.outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess())
.inputChannel(replies()) //how to use the functional style here?
方法 org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel)
接受一个 org.springframework.messaging.MessageChannel
,它是一个函数式接口(请参阅 it is annotated with @FunctionalInterface
)。因此,您可以使用 lambda 在构建器中定义输出通道。
但是,org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#inputChannel(PollableChannel)
接受 org.springframework.messaging.PollableChannel
不是 功能接口。这就是为什么不能在这里使用 lambda 的原因。
spring-batch
spring-cloud-stream
spring-integration-dsl
spring-cloud-function
spring-batch-integration
我正在实施 Spring 批处理集成 RemoteChunking。 https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking
我遇到了 @Input 的弃用,文档说我们必须使用函数式风格。
如何在我的 Spring 批处理集成流程中使用 Consumer(在 spring 云流中使用) ?
package pt.bayonne.sensei.RemoteChunking.manager;
import org.aspectj.bridge.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.function.Consumer;
import java.util.function.Supplier;
@Profile("!worker")
@Configuration
public class FunctionalBinders {
@Bean
public Sinks.Many<Object> sink() {
return Sinks.many()
.replay()
.latest();
}
@Bean
public Supplier<Flux<Object>> clientRequests() {
return () -> sink()
.asFlux()
.cache();
}
@Bean
public Consumer<Message<?>> onClientReplies(){
return message -> {
//do your stuff
};
}
}
我的作业配置
public TaskletStep dispatchStep(){
return this.remoteChunkingManagerStepBuilderFactory.get("dispatch-step")
.chunk(10)
.reader(reader())
.outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess())
.inputChannel(replies()) //how to use the functional style here?
.build();
}
我知道它需要一个 PollableChannel 但我的问题是 如何在我的 RemoteChunkingManagerStepBuilderFactory 上使用功能样式?
任何例子将不胜感激。 非常感谢。
.outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess()) .inputChannel(replies()) //how to use the functional style here?
方法 org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel)
接受一个 org.springframework.messaging.MessageChannel
,它是一个函数式接口(请参阅 it is annotated with @FunctionalInterface
)。因此,您可以使用 lambda 在构建器中定义输出通道。
但是,org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#inputChannel(PollableChannel)
接受 org.springframework.messaging.PollableChannel
不是 功能接口。这就是为什么不能在这里使用 lambda 的原因。