如何在 Spring Cloud Stream 中配置函数的绑定以将其输入绑定到 Web 端点并将其输出绑定到 Kafka 主题
How to configure the bindings of a function in Spring Cloud Stream to bind its input to a web endpoint and its output to a Kafka topic
我有一个普通的 java Function
;我正在尝试绑定:
- 它对网络端点的输入
- 它输出到一个kafka主题。
当我在 web 上下文中使用我的函数时,它总是 returns 将 Function
的结果值单独返回给 web 客户端。我可以做这样的事情吗?:
spring.cloud.stream.bindings.input.binder=web
spring.cloud.stream.bindings.output.binder=kafka
我目前正在尝试将 Function
拆分为 2:
- 一个输入绑定到网络客户端,输出动态绑定到第二个函数(使用
spring.cloud.stream.sendto.destination
)
- 另一个函数,其输出绑定到 kafka 绑定。
这个方法还是不行。动态路由 (spring.cloud.stream.sendto.destination
) 显示在 Web 客户端上;但是没有 Message
被发送到 kafka 绑定本身。这是我在第二种方法(2 个函数)中使用的代码,希望简单地获得一个 Spring 功能应用程序,将其输入绑定到 Web 端点并输出到 kafka 主题。
WebToKafkaApp.java
@SpringBootApplication
public class WebToKafkaApp {
public static void main(String[] args) {
SpringApplication.run(WebToKafkaApp.class, args);
}
@Bean
public Function<String, Message<String>> webFunction() {
return payload -> createPayloadMapperToMessage("kafkaFunction").apply(payload);
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> kafkaFunction() {
return flux -> flux.map(msg -> createPayloadMapperToMessage("").apply(msg.getPayload()));
}
private Function<String, Message<String>> createPayloadMapperToMessage(String destination) {
return payload -> MessageBuilder
.withPayload(payload.toUpperCase())
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
}
}
application.yml
spring.cloud.stream.bindings.webFunction-in-0:
destination: webFunctionIN
contentType: application/json
spring.cloud.stream.bindings.webFunction-out-0:
destination: webFunctionOUT
contentType: application/json
spring.cloud.stream.bindings.kafkaFunction-in-0:
destination: kafkaFunctionIN
contentType: application/json
binder: kafka
spring.cloud.stream.bindings.kafkaFunction-out-0:
destination: kafkaFunctionOUT
contentType: application/json
binder: kafka
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
spring.cloud.stream.function.routing.enabled: true
spring.cloud.function.definition: webFunction
build.gradle
plugins {
id 'org.springframework.boot' version '2.2.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Hoxton.RELEASE")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
implementation 'org.springframework.cloud:spring-cloud-starter-function-webflux'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
如有任何帮助,我们将不胜感激。
感谢 Oleg for posting 。本质上,我增强了他的建议,以一般地处理以下之间的桥梁:
- 一个功能性的网络控制器;可以接收网络请求。
- 流媒体供应商;它可以将任何消息转发到消息传递基础结构。
此解决方案将 中描述的问题封装在 Supplier
的自定义实现中。这样的实现公开了一个 API 来触发 Supplier
发出作为参数传递的消息。这样的 class 将如下所示:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.function.Supplier;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
public class StreamSupplier implements Supplier<Flux<?>> {
private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION =
"spring.cloud.stream.sendto.destination";
public static <T> Message<?> createMessage(T payload, String destination) {
MessageBuilder<T> builder = MessageBuilder.withPayload(payload);
if (destination != null && !destination.isEmpty())
builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);
return builder.build();
}
private String defaultDestination;
private EmitterProcessor<? super Object> processor = EmitterProcessor.create();
public StreamSupplier() {
this(null);
}
public StreamSupplier(String defaultDestination) {
this.defaultDestination = defaultDestination;
}
// SEND APIs
public <T> Message<?> sendMessage(T payload) {
return sendMessage(payload, defaultDestination);
}
public <T> Message<?> sendMessage(T payload, String destination) {
return sendBody(createMessage(payload, destination));
}
public <T> T sendBody(T body) {
processor.onNext(body);
return body;
}
/**
* Returns {@link EmitterProcessor} used internally to programmatically publish messages onto
* the output binding associated with this {@link Supplier}. Such programmatic publications
* are available through the {@code sendXXX} API methods available in this class.
*/
@Override
public Flux<?> get() {
return processor;
}
}
那么开发人员只需:
- 在
Spring
应用程序中将此特定 Supplier
实现的实例注册为 bean
;并让 spring-cloud-function
将此 bean
扫描到 FunctionCatalog
.
- 创建一个 web function,使用之前注册的
Supplier
将任何消息转发到流媒体基础设施 - 可以使用 spring-cloud-stream
的所有功能进行配置。
以下示例演示了这一点:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
@SpringBootApplication
@Controller
public class MyApp {
public static void main(String[] args) {
SpringApplication.run(MyApp.class,
"--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");
}
// Functional Web Controller
@Bean
public Function<String, String> webToStreamFunction() {
return msg -> streamSupplier().sendBody(msg);
}
// Functional Stream Supplier
@Bean
public Supplier<Flux<?>> streamSupplierFunction() {
return new StreamSupplier();
}
// DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION
// LIMITATION OF SPRING-CLOUD-FUNCTION
@Bean
public StreamSupplier streamSupplier() {
return (StreamSupplier) streamSupplierFunction();
}
}
再次感谢Oleg for providing the 我一直在寻找构建这个全面的解决方案。
我有一个普通的 java Function
;我正在尝试绑定:
- 它对网络端点的输入
- 它输出到一个kafka主题。
当我在 web 上下文中使用我的函数时,它总是 returns 将 Function
的结果值单独返回给 web 客户端。我可以做这样的事情吗?:
spring.cloud.stream.bindings.input.binder=web
spring.cloud.stream.bindings.output.binder=kafka
我目前正在尝试将 Function
拆分为 2:
- 一个输入绑定到网络客户端,输出动态绑定到第二个函数(使用
spring.cloud.stream.sendto.destination
) - 另一个函数,其输出绑定到 kafka 绑定。
这个方法还是不行。动态路由 (spring.cloud.stream.sendto.destination
) 显示在 Web 客户端上;但是没有 Message
被发送到 kafka 绑定本身。这是我在第二种方法(2 个函数)中使用的代码,希望简单地获得一个 Spring 功能应用程序,将其输入绑定到 Web 端点并输出到 kafka 主题。
WebToKafkaApp.java
@SpringBootApplication
public class WebToKafkaApp {
public static void main(String[] args) {
SpringApplication.run(WebToKafkaApp.class, args);
}
@Bean
public Function<String, Message<String>> webFunction() {
return payload -> createPayloadMapperToMessage("kafkaFunction").apply(payload);
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> kafkaFunction() {
return flux -> flux.map(msg -> createPayloadMapperToMessage("").apply(msg.getPayload()));
}
private Function<String, Message<String>> createPayloadMapperToMessage(String destination) {
return payload -> MessageBuilder
.withPayload(payload.toUpperCase())
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
}
}
application.yml
spring.cloud.stream.bindings.webFunction-in-0:
destination: webFunctionIN
contentType: application/json
spring.cloud.stream.bindings.webFunction-out-0:
destination: webFunctionOUT
contentType: application/json
spring.cloud.stream.bindings.kafkaFunction-in-0:
destination: kafkaFunctionIN
contentType: application/json
binder: kafka
spring.cloud.stream.bindings.kafkaFunction-out-0:
destination: kafkaFunctionOUT
contentType: application/json
binder: kafka
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
spring.cloud.stream.function.routing.enabled: true
spring.cloud.function.definition: webFunction
build.gradle
plugins {
id 'org.springframework.boot' version '2.2.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Hoxton.RELEASE")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
implementation 'org.springframework.cloud:spring-cloud-starter-function-webflux'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
如有任何帮助,我们将不胜感激。
感谢 Oleg for posting
- 一个功能性的网络控制器;可以接收网络请求。
- 流媒体供应商;它可以将任何消息转发到消息传递基础结构。
此解决方案将 Supplier
的自定义实现中。这样的实现公开了一个 API 来触发 Supplier
发出作为参数传递的消息。这样的 class 将如下所示:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.function.Supplier;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
public class StreamSupplier implements Supplier<Flux<?>> {
private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION =
"spring.cloud.stream.sendto.destination";
public static <T> Message<?> createMessage(T payload, String destination) {
MessageBuilder<T> builder = MessageBuilder.withPayload(payload);
if (destination != null && !destination.isEmpty())
builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);
return builder.build();
}
private String defaultDestination;
private EmitterProcessor<? super Object> processor = EmitterProcessor.create();
public StreamSupplier() {
this(null);
}
public StreamSupplier(String defaultDestination) {
this.defaultDestination = defaultDestination;
}
// SEND APIs
public <T> Message<?> sendMessage(T payload) {
return sendMessage(payload, defaultDestination);
}
public <T> Message<?> sendMessage(T payload, String destination) {
return sendBody(createMessage(payload, destination));
}
public <T> T sendBody(T body) {
processor.onNext(body);
return body;
}
/**
* Returns {@link EmitterProcessor} used internally to programmatically publish messages onto
* the output binding associated with this {@link Supplier}. Such programmatic publications
* are available through the {@code sendXXX} API methods available in this class.
*/
@Override
public Flux<?> get() {
return processor;
}
}
那么开发人员只需:
- 在
Spring
应用程序中将此特定Supplier
实现的实例注册为bean
;并让spring-cloud-function
将此bean
扫描到FunctionCatalog
. - 创建一个 web function,使用之前注册的
Supplier
将任何消息转发到流媒体基础设施 - 可以使用spring-cloud-stream
的所有功能进行配置。
以下示例演示了这一点:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
@SpringBootApplication
@Controller
public class MyApp {
public static void main(String[] args) {
SpringApplication.run(MyApp.class,
"--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");
}
// Functional Web Controller
@Bean
public Function<String, String> webToStreamFunction() {
return msg -> streamSupplier().sendBody(msg);
}
// Functional Stream Supplier
@Bean
public Supplier<Flux<?>> streamSupplierFunction() {
return new StreamSupplier();
}
// DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION
// LIMITATION OF SPRING-CLOUD-FUNCTION
@Bean
public StreamSupplier streamSupplier() {
return (StreamSupplier) streamSupplierFunction();
}
}
再次感谢Oleg for providing the