函数式编程模型 bean 定义和 spring-cloud-function + spring-cloud-stream 集成
Functional programming model bean definition and spring-cloud-function + spring-cloud-stream integration
大家好,特别是 spring 团队!
如何在函数式 Bean 编程模型风格中将 spring-cloud-function 与 spring-cloud-stream 流水线化?
例如,我有 pom.xml 两个依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-webflux</artifactId>
</dependency>
假设我下一步想做:
- 通过 spring-cloud-function (webflux)
通过 http 负载字符串发送
- 使用我的 toUpperCase 函数将其大写
- 最后发送到我的管道中以安装活页夹 (kafka/rabbit/test-binder)
所以我希望像这样实现它:
@Log4j2
@SpringBootApplication
public class SpringCloudFunctionStreamApplication {
/**
* can I sent result of that function to my broker without any
* explicitly defined output.send(...) execution?
*/
@Bean
public Function<String, String> toUpperCase() {
return arg -> {
var res = arg.toUpperCase();
log.info("toUpperCase: {}", res);
return res;
};
}
public static void main(String[] args) {
SpringApplication.run(
SpringCloudFunctionStreamApplication.class,
"--spring.cloud.function.definition=toUpperCase",
"--spring.cloud.stream.function.definition=toUpperCase"
);
}
}
所以当我使用 HTTPie 发送负载时,就像这样:
echo 'hello' | http :8080/toUpperCase
spring-cloud-function 似乎工作正常,我可以看到预期的日志:
2019-06-09 21:20:36.978 ...SpringCloudFunctionStreamApplication : toUpperCase: hello
同样认为如果我通过 rabbitmq 管理网络发布消息 ui,但我如何从一个管道到另一个管道
所以我的问题与 according to spring documentation which says that I can use spring-cloud-stream as well: 函数、消费者和供应商类型的@Beans 的包装器有关,将它们作为 HTTP 端点 and/or 消息流 listeners/publishers 暴露给外界RabbitMQ、Kafka 等,但我怎么看不懂?
目前,不幸的是,我只能使用 Source see example here 手动将消息发布到 spring-cloud-stream binder,但这当然是我想知道是否可以避免的spring,神奇地...
任何人都可以告诉我(可能是 Gary Russell、Dave Sawyer、Artem Bilan、Oleg Zhurakousky 或任何其他知道的人):我错过了什么以及我应该如何配置我的应用程序或我应该在我的 application.properties, 等等?
谢谢!
更新
有一段时间了,但我决定post这里有一个解决方案...
简而言之:固定提交在这里:https://github.com/daggerok/spring-cloud-function-stream-integration/commit/35325465b81bb869c31ec7892f413ab891d6d0fd
所以基本上使用 StreamBridge 我可以连接 spring-cloud-function 和 spring-cloud-stream ...直接来自我的 spring-cloud-function
在提到的更新存储库中查看详细信息(如果您愿意,可以修复分支)
干杯!
此致,
马克西姆
马克西姆
这并不理想,但鉴于我们在现有绑定器的范围内实现了初始功能支持,因此存在一些限制。我会解释,但首先这里是功能齐全的代码:
@SpringBootApplication
public class SimpleFunctionRabbitDemoApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleFunctionRabbitDemoApplication.class,
"--spring.cloud.stream.function.definition=uppercase");
}
@Autowired
private Processor processor;
@Bean
public Consumer<String> consume() {
return v -> processor.input().send(MessageBuilder.withPayload(v).build());
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
基本上有点不匹配。在流端我们有绑定器,在功能端我们有适配器。您正在有效地(根据您的要求)尝试将两者连接成一条管道。所以。 . .
我们先看看活页夹。
大写函数绑定到消息通道绑定器(rabbit 或 kafka)提供的 input
和 output
通道,有效地创建了一个内部管道 input -> uppercase -> output
。 s-c-function 也将其公开为 REST 端点,但是,s-c-function 无权访问上述管道。事实上,它实际上有自己的管道 request -> uppercase -> reply
。
所以我们需要做的是将这两个概念联系在一起,而这正是我所做的。
您使用 Processor
绑定注入您的应用程序,其中包含对 uppercase
绑定到的频道的引用。
您通过 REST http://localhost:8080/consume/blah
调用 consume()
。
您向 uppercase
函数的输入通道发送了一条消息
将来为了简化这个,我们只需要创建一个 binder-like 版本的网络适配器,所以请随时提出功能请求。但正如您所看到的,当前的解决方法还不止于此。
这个问题更像是 Oleg Zhurakousky 的问题。如果回答,将很高兴
如果我使用 @Bean Supplier<Pojo>...
绑定输出目标,如何在每次新的 @Service
class 或 @Controller
class 中调用它Pojo
将发送到 Kafka/Rabbit。
Supplier
只公开了一个 get()
方法。
我只编写生产者,它将向 Kafka 编写自定义 Pojo
,而不同的应用程序是消费者。 Consumer<Pojo>...
的功能方法更清晰,它只从 Kafka 和进程中读取。生产者的 Supplier<Pojo>...
部分不清楚。
@Abhishek
您可以按照 here 所述使用 EmitterProcessor
。提供的示例使用休息端点作为他的实际数据源,但如您所见,它不会计量,因为您需要做的就是调用 EmitterProcessor
的 onNext
操作,从服务传递您的事件。
大家好,特别是 spring 团队!
如何在函数式 Bean 编程模型风格中将 spring-cloud-function 与 spring-cloud-stream 流水线化?
例如,我有 pom.xml 两个依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-webflux</artifactId>
</dependency>
假设我下一步想做:
- 通过 spring-cloud-function (webflux) 通过 http 负载字符串发送
- 使用我的 toUpperCase 函数将其大写
- 最后发送到我的管道中以安装活页夹 (kafka/rabbit/test-binder)
所以我希望像这样实现它:
@Log4j2
@SpringBootApplication
public class SpringCloudFunctionStreamApplication {
/**
* can I sent result of that function to my broker without any
* explicitly defined output.send(...) execution?
*/
@Bean
public Function<String, String> toUpperCase() {
return arg -> {
var res = arg.toUpperCase();
log.info("toUpperCase: {}", res);
return res;
};
}
public static void main(String[] args) {
SpringApplication.run(
SpringCloudFunctionStreamApplication.class,
"--spring.cloud.function.definition=toUpperCase",
"--spring.cloud.stream.function.definition=toUpperCase"
);
}
}
所以当我使用 HTTPie 发送负载时,就像这样:
echo 'hello' | http :8080/toUpperCase
spring-cloud-function 似乎工作正常,我可以看到预期的日志:
2019-06-09 21:20:36.978 ...SpringCloudFunctionStreamApplication : toUpperCase: hello
同样认为如果我通过 rabbitmq 管理网络发布消息 ui,但我如何从一个管道到另一个管道
所以我的问题与 according to spring documentation which says that I can use spring-cloud-stream as well: 函数、消费者和供应商类型的@Beans 的包装器有关,将它们作为 HTTP 端点 and/or 消息流 listeners/publishers 暴露给外界RabbitMQ、Kafka 等,但我怎么看不懂?
目前,不幸的是,我只能使用 Source see example here 手动将消息发布到 spring-cloud-stream binder,但这当然是我想知道是否可以避免的spring,神奇地...
任何人都可以告诉我(可能是 Gary Russell、Dave Sawyer、Artem Bilan、Oleg Zhurakousky 或任何其他知道的人):我错过了什么以及我应该如何配置我的应用程序或我应该在我的 application.properties, 等等?
谢谢!
更新
有一段时间了,但我决定post这里有一个解决方案...
简而言之:固定提交在这里:https://github.com/daggerok/spring-cloud-function-stream-integration/commit/35325465b81bb869c31ec7892f413ab891d6d0fd
所以基本上使用 StreamBridge 我可以连接 spring-cloud-function 和 spring-cloud-stream ...直接来自我的 spring-cloud-function
在提到的更新存储库中查看详细信息(如果您愿意,可以修复分支)
干杯!
此致, 马克西姆
马克西姆
这并不理想,但鉴于我们在现有绑定器的范围内实现了初始功能支持,因此存在一些限制。我会解释,但首先这里是功能齐全的代码:
@SpringBootApplication
public class SimpleFunctionRabbitDemoApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleFunctionRabbitDemoApplication.class,
"--spring.cloud.stream.function.definition=uppercase");
}
@Autowired
private Processor processor;
@Bean
public Consumer<String> consume() {
return v -> processor.input().send(MessageBuilder.withPayload(v).build());
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
基本上有点不匹配。在流端我们有绑定器,在功能端我们有适配器。您正在有效地(根据您的要求)尝试将两者连接成一条管道。所以。 . .
我们先看看活页夹。
大写函数绑定到消息通道绑定器(rabbit 或 kafka)提供的 input
和 output
通道,有效地创建了一个内部管道 input -> uppercase -> output
。 s-c-function 也将其公开为 REST 端点,但是,s-c-function 无权访问上述管道。事实上,它实际上有自己的管道 request -> uppercase -> reply
。
所以我们需要做的是将这两个概念联系在一起,而这正是我所做的。
您使用
Processor
绑定注入您的应用程序,其中包含对uppercase
绑定到的频道的引用。您通过 REST
http://localhost:8080/consume/blah
调用consume()
。您向
uppercase
函数的输入通道发送了一条消息
将来为了简化这个,我们只需要创建一个 binder-like 版本的网络适配器,所以请随时提出功能请求。但正如您所看到的,当前的解决方法还不止于此。
这个问题更像是 Oleg Zhurakousky 的问题。如果回答,将很高兴
如果我使用 @Bean Supplier<Pojo>...
绑定输出目标,如何在每次新的 @Service
class 或 @Controller
class 中调用它Pojo
将发送到 Kafka/Rabbit。
Supplier
只公开了一个 get()
方法。
我只编写生产者,它将向 Kafka 编写自定义 Pojo
,而不同的应用程序是消费者。 Consumer<Pojo>...
的功能方法更清晰,它只从 Kafka 和进程中读取。生产者的 Supplier<Pojo>...
部分不清楚。
@Abhishek
您可以按照 here 所述使用 EmitterProcessor
。提供的示例使用休息端点作为他的实际数据源,但如您所见,它不会计量,因为您需要做的就是调用 EmitterProcessor
的 onNext
操作,从服务传递您的事件。