将 Spring 集成 executorChannel 与 Spring 云函数结合使用
Using Spring Integration executorChannel with Spring Cloud Function
我正在使用 spring 云函数通过 Flux 处理来自 kafka 的数据。默认情况下,它在消费者线程(消费消息的地方)中处理数据。我将为并行数据处理和节流实施线程池,Spring Cloud Integration 中有一个很棒的实施,称为 executorChannel (https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ExecutorChannel.html)
函数实现示例:
public static class FN1 implements Function<Flux<String>, Flux<String>> {
public Flux<String> apply(Flux<String> data) {
return data
.map(f -> doSomething() )
}
}
所以我发现没有简单的方法来连接通过 executorChannel 实现的函数。
M.b。有一种方法可以定义 inputChannel 类型吗?
更新:阅读奥列格回答下的评论。它们非常有用。
你的意思是这样的?
@SpringBootApplication
public class SampleFunctoinAppApplication {
public static void main(String[] args) throws Exception {
ApplicationContext context = SpringApplication.run(SampleFunctoinAppApplication.class, args);
SubscribableChannel output = context.getBean("output", SubscribableChannel.class);
output.subscribe(System.out::println);
MessageChannel channel = context.getBean("executorChannel", MessageChannel.class);
channel.send(new GenericMessage<String>("hello"));
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows
.from("executorChannel")
.transform(echo())
.channel("output")
.get();
}
@Bean
public ExecutorChannel executorChannel() {
return new ExecutorChannel(Executors.newCachedThreadPool());
}
public Function<String, String> echo() {
return v -> v;
}
}
"define inputChannel type" 是什么意思?
我正在使用 spring 云函数通过 Flux 处理来自 kafka 的数据。默认情况下,它在消费者线程(消费消息的地方)中处理数据。我将为并行数据处理和节流实施线程池,Spring Cloud Integration 中有一个很棒的实施,称为 executorChannel (https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ExecutorChannel.html)
函数实现示例:
public static class FN1 implements Function<Flux<String>, Flux<String>> {
public Flux<String> apply(Flux<String> data) {
return data
.map(f -> doSomething() )
}
}
所以我发现没有简单的方法来连接通过 executorChannel 实现的函数。
M.b。有一种方法可以定义 inputChannel 类型吗?
更新:阅读奥列格回答下的评论。它们非常有用。
你的意思是这样的?
@SpringBootApplication
public class SampleFunctoinAppApplication {
public static void main(String[] args) throws Exception {
ApplicationContext context = SpringApplication.run(SampleFunctoinAppApplication.class, args);
SubscribableChannel output = context.getBean("output", SubscribableChannel.class);
output.subscribe(System.out::println);
MessageChannel channel = context.getBean("executorChannel", MessageChannel.class);
channel.send(new GenericMessage<String>("hello"));
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows
.from("executorChannel")
.transform(echo())
.channel("output")
.get();
}
@Bean
public ExecutorChannel executorChannel() {
return new ExecutorChannel(Executors.newCachedThreadPool());
}
public Function<String, String> echo() {
return v -> v;
}
}
"define inputChannel type" 是什么意思?