我可以为多个租户(kafka binder)使用 spring 云流函数定义吗?
Can I use a spring cloud stream function definition for multiple tenants (kafka binder)?
我有一个流处理器,可以处理从 Kafka InputTopic
到 OutputTopic
的消息。此外,我有多个租户将为其进行此处理。我们称它们为租户 A 和租户 B,但应用程序应处理的租户可能有十几个。输入和输出主题遵循命名约定:A-input, B-input, ... and A-output, B-output...
函数定义如下:
@Configuration
public class StreamProcessorConfig {
@Bean
public Function<KStream<String, InputType>, KStream<String, OutputType>> myfunctiondefinition() {
return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
return KeyValue.pair(k, OutputType.createFrom(v));
});
}
}
我的 application.yaml 现在为租户 A 配置流应用程序:
tenant: A
spring.cloud.function.definition: myfunctiondefinition
spring.cloud.stream.kafka.streams.binder.functions.myfunctiondefinition:
applicationId: ${spring.application.name}-myfunctiondefinition
spring.cloud.stream.bindings.myfunctiondefinition-in-0:
destination: ${tenant}-input
spring.cloud.stream.bindings.myfunctiondefinition-out-0:
destination: ${tenant}-output
如何修改配置为租户B添加实例?当然,我可以复制 myfunctiondefinition() 以及所有配置键,但我正在寻找一种方法来快速动态添加租户并仅通过配置进行清理。这可能吗?
注意:运行租户 B 的另一个应用程序实例和其他租户很遗憾不是一个选项。
我们通过手动注册功能 bean 找到了解决此问题的方法。遗憾的是,这并不像我们想象的那么容易。 FunctionDetectorCondition
(https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/FunctionDetectorCondition.java) 需要一个 AnnotatedBeanDefinition
用作实际流处理 bean 的模板。这可以作为对 spring 云流注册一个可以多次使用的函数定义模板的建议。
为了达到这个目标,我们初始化一个工厂 bean 而不是流处理器函数本身:
@Configuration
public class StreamProcessorConfig {
@Bean
public MyFunctionDefinitionFactory myFunctionDefinitionFactory() {
return new MyFunctionDefinitionFactory();
}
}
工厂创建流处理器函数:
public class MyFunctionDefinitionFactory {
public Function<KStream<String, InputType>,
KStream<String, OutputType>> myfunctiondefinition() {
return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
return KeyValue.pair(k, OutputType.createFrom(v));
});
}
}
现在我们需要一个虚拟 Bean 接口,它是 Spring Cloud Streams 应用其逻辑创建流处理器所必需的:
// Behaves as dummy bean for spring cloud stream
// Has to be the same name as the original streaming function in the factory.
// In this case we named the method "myfunctiondefinition",
// so the dummy-bean has to get the name "Myfunctiondefinition".
public class Myfunctiondefinition implements Function<KStream<String, InputType>,
KStream<String, OutputType>> {
// !!! It could be that changes are needed if spring cloud streams changes the logic
// Method myfunctiondefinition() is needed, because spring cloud streams searches for
// a method with the same name as the class in
// FunctionDetectorCondition:pruneFunctionBeansForKafkaStreams
public Function<KStream<String, InputType>,
KStream<String, OutputType>> myfunctiondefinition() {
return null;
}
// Needed for the interface implementation. Spring cloud streams needs
// the class Function to identify a stream processor candidate.
@Override
public KStream<String, OutputType> apply(KStream<String, InputType> input) {
return null;
}
}
现在我们已经准备就绪,我们可以为每个租户注册一个 bean。我们在 ApplicationContextInitializer
中执行此操作,该 ApplicationContextInitializer
使用工厂方法创建 bean 定义并迭代我们将在配置文件 application.yaml
.
中定义的 functions
public class StreamProcessorInitializer
implements ApplicationContextInitializer<GenericWebApplicationContext> {
@Override
public void initialize(GenericWebApplicationContext context) {
String functionDefinitions = context.getEnvironment()
.getProperty("spring.cloud.function.definition");
String splitter = context.getEnvironment()
.getProperty("spring.cloud.function.definition.splitter");
String factoryName = CaseFormat.UPPER_CAMEL.
.to(CaseFormat.LOWER_CAMEL, MyFunctionDefinitionFactory.class.getSimpleName());
String factoryMethodName =
MyFunctionDefinitionFactory.class.getMethods()[0].getName();
AnnotatedGenericBeanDefinition def =
new AnnotatedGenericBeanDefinition(Myfunctiondefinition.class);
def.setFactoryBeanName(factoryName);
def.setFactoryMethodName(factoryMethodName);
Arrays.stream(functionDefinitions.split(splitter))
.forEach(function -> context.registerBeanDefinition(function, def));
}
}
终于可以在application.yaml
中动态定义函数了。这个可以通过helm oder kustomize来配置具体的租户环境:
#--------------------------------------------------------------------------------------------------------------------------------------
# streaming processor functions (going to be filled by helm)
#--------------------------------------------------------------------------------------------------------------------------------------
spring.cloud.function.definition: <name1>,<name2>,...
#--Note-- required as spring cloud streams has changed the splitter in the past
spring.cloud.function.definition.splitter: ;
# Properties per function (<name>)
spring.cloud.stream.kafka.streams.binder.functions.<name>.applicationId: ${tenant}-${spring.application.name}-<name>
# configuring dlq (if you have one)
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.dlqName: ${tenant}-<name>-dlq
# configuring in- and output topics
spring.cloud.stream.bindings.<name>-in-0.destination: ${tenant}-<inputname>
spring.cloud.stream.bindings.<name>-out-0.destination: ${tenant}-<outputname>
我有一个流处理器,可以处理从 Kafka InputTopic
到 OutputTopic
的消息。此外,我有多个租户将为其进行此处理。我们称它们为租户 A 和租户 B,但应用程序应处理的租户可能有十几个。输入和输出主题遵循命名约定:A-input, B-input, ... and A-output, B-output...
函数定义如下:
@Configuration
public class StreamProcessorConfig {
@Bean
public Function<KStream<String, InputType>, KStream<String, OutputType>> myfunctiondefinition() {
return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
return KeyValue.pair(k, OutputType.createFrom(v));
});
}
}
我的 application.yaml 现在为租户 A 配置流应用程序:
tenant: A
spring.cloud.function.definition: myfunctiondefinition
spring.cloud.stream.kafka.streams.binder.functions.myfunctiondefinition:
applicationId: ${spring.application.name}-myfunctiondefinition
spring.cloud.stream.bindings.myfunctiondefinition-in-0:
destination: ${tenant}-input
spring.cloud.stream.bindings.myfunctiondefinition-out-0:
destination: ${tenant}-output
如何修改配置为租户B添加实例?当然,我可以复制 myfunctiondefinition() 以及所有配置键,但我正在寻找一种方法来快速动态添加租户并仅通过配置进行清理。这可能吗?
注意:运行租户 B 的另一个应用程序实例和其他租户很遗憾不是一个选项。
我们通过手动注册功能 bean 找到了解决此问题的方法。遗憾的是,这并不像我们想象的那么容易。 FunctionDetectorCondition
(https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/FunctionDetectorCondition.java) 需要一个 AnnotatedBeanDefinition
用作实际流处理 bean 的模板。这可以作为对 spring 云流注册一个可以多次使用的函数定义模板的建议。
为了达到这个目标,我们初始化一个工厂 bean 而不是流处理器函数本身:
@Configuration
public class StreamProcessorConfig {
@Bean
public MyFunctionDefinitionFactory myFunctionDefinitionFactory() {
return new MyFunctionDefinitionFactory();
}
}
工厂创建流处理器函数:
public class MyFunctionDefinitionFactory {
public Function<KStream<String, InputType>,
KStream<String, OutputType>> myfunctiondefinition() {
return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
return KeyValue.pair(k, OutputType.createFrom(v));
});
}
}
现在我们需要一个虚拟 Bean 接口,它是 Spring Cloud Streams 应用其逻辑创建流处理器所必需的:
// Behaves as dummy bean for spring cloud stream
// Has to be the same name as the original streaming function in the factory.
// In this case we named the method "myfunctiondefinition",
// so the dummy-bean has to get the name "Myfunctiondefinition".
public class Myfunctiondefinition implements Function<KStream<String, InputType>,
KStream<String, OutputType>> {
// !!! It could be that changes are needed if spring cloud streams changes the logic
// Method myfunctiondefinition() is needed, because spring cloud streams searches for
// a method with the same name as the class in
// FunctionDetectorCondition:pruneFunctionBeansForKafkaStreams
public Function<KStream<String, InputType>,
KStream<String, OutputType>> myfunctiondefinition() {
return null;
}
// Needed for the interface implementation. Spring cloud streams needs
// the class Function to identify a stream processor candidate.
@Override
public KStream<String, OutputType> apply(KStream<String, InputType> input) {
return null;
}
}
现在我们已经准备就绪,我们可以为每个租户注册一个 bean。我们在 ApplicationContextInitializer
中执行此操作,该 ApplicationContextInitializer
使用工厂方法创建 bean 定义并迭代我们将在配置文件 application.yaml
.
functions
public class StreamProcessorInitializer
implements ApplicationContextInitializer<GenericWebApplicationContext> {
@Override
public void initialize(GenericWebApplicationContext context) {
String functionDefinitions = context.getEnvironment()
.getProperty("spring.cloud.function.definition");
String splitter = context.getEnvironment()
.getProperty("spring.cloud.function.definition.splitter");
String factoryName = CaseFormat.UPPER_CAMEL.
.to(CaseFormat.LOWER_CAMEL, MyFunctionDefinitionFactory.class.getSimpleName());
String factoryMethodName =
MyFunctionDefinitionFactory.class.getMethods()[0].getName();
AnnotatedGenericBeanDefinition def =
new AnnotatedGenericBeanDefinition(Myfunctiondefinition.class);
def.setFactoryBeanName(factoryName);
def.setFactoryMethodName(factoryMethodName);
Arrays.stream(functionDefinitions.split(splitter))
.forEach(function -> context.registerBeanDefinition(function, def));
}
}
终于可以在application.yaml
中动态定义函数了。这个可以通过helm oder kustomize来配置具体的租户环境:
#--------------------------------------------------------------------------------------------------------------------------------------
# streaming processor functions (going to be filled by helm)
#--------------------------------------------------------------------------------------------------------------------------------------
spring.cloud.function.definition: <name1>,<name2>,...
#--Note-- required as spring cloud streams has changed the splitter in the past
spring.cloud.function.definition.splitter: ;
# Properties per function (<name>)
spring.cloud.stream.kafka.streams.binder.functions.<name>.applicationId: ${tenant}-${spring.application.name}-<name>
# configuring dlq (if you have one)
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.dlqName: ${tenant}-<name>-dlq
# configuring in- and output topics
spring.cloud.stream.bindings.<name>-in-0.destination: ${tenant}-<inputname>
spring.cloud.stream.bindings.<name>-out-0.destination: ${tenant}-<outputname>