我可以为多个租户(kafka binder)使用 spring 云流函数定义吗?

Can I use a spring cloud stream function definition for multiple tenants (kafka binder)?

我有一个流处理器,可以处理从 Kafka InputTopicOutputTopic 的消息。此外,我有多个租户将为其进行此处理。我们称它们为租户 A 和租户 B,但应用程序应处理的租户可能有十几个。输入和输出主题遵循命名约定:A-input, B-input, ... and A-output, B-output...

函数定义如下:

@Configuration
public class StreamProcessorConfig {

    @Bean
    public Function<KStream<String, InputType>, KStream<String, OutputType>> myfunctiondefinition() {
        return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
            return KeyValue.pair(k, OutputType.createFrom(v));
        });
    }

}

我的 application.yaml 现在为租户 A 配置流应用程序:

tenant: A

spring.cloud.function.definition: myfunctiondefinition
spring.cloud.stream.kafka.streams.binder.functions.myfunctiondefinition:
    applicationId: ${spring.application.name}-myfunctiondefinition

spring.cloud.stream.bindings.myfunctiondefinition-in-0:
  destination: ${tenant}-input
spring.cloud.stream.bindings.myfunctiondefinition-out-0:
  destination: ${tenant}-output

如何修改配置为租户B添加实例?当然,我可以复制 myfunctiondefinition() 以及所有配置键,但我正在寻找一种方法来快速动态添加租户并仅通过配置进行清理。这可能吗?

注意:运行租户 B 的另一个应用程序实例和其他租户很遗憾不是一个选项。

我们通过手动注册功能 bean 找到了解决此问题的方法。遗憾的是,这并不像我们想象的那么容易。 FunctionDetectorCondition (https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/FunctionDetectorCondition.java) 需要一个 AnnotatedBeanDefinition 用作实际流处理 bean 的模板。这可以作为对 spring 云流注册一个可以多次使用的函数定义模板的建议。

为了达到这个目标,我们初始化一个工厂 bean 而不是流处理器函数本身:

@Configuration
public class StreamProcessorConfig {

    @Bean
    public MyFunctionDefinitionFactory myFunctionDefinitionFactory() {
        return new MyFunctionDefinitionFactory();
    }
}

工厂创建流处理器函数:

public class MyFunctionDefinitionFactory {

    public Function<KStream<String, InputType>, 
               KStream<String, OutputType>> myfunctiondefinition() {
        return inputTypeStream -> inputTypeStream.map((String k, InputType v) -> {
            return KeyValue.pair(k, OutputType.createFrom(v));
        });
    }
}

现在我们需要一个虚拟 Bean 接口,它是 Spring Cloud Streams 应用其逻辑创建流处理器所必需的:

// Behaves as dummy bean for spring cloud stream
// Has to be the same name as the original streaming function in the factory.
// In this case we named the method "myfunctiondefinition",
// so the dummy-bean has to get the name "Myfunctiondefinition".
public class Myfunctiondefinition implements Function<KStream<String, InputType>, 
                KStream<String, OutputType>> {

    // !!! It could be that changes are needed if spring cloud streams changes the logic
    // Method myfunctiondefinition() is needed, because spring cloud streams searches for 
    // a method with the same name as the class in 
    // FunctionDetectorCondition:pruneFunctionBeansForKafkaStreams
    public Function<KStream<String, InputType>, 
               KStream<String, OutputType>> myfunctiondefinition() {
        return null;
    }

    // Needed for the interface implementation. Spring cloud streams needs
    // the class Function to identify a stream processor candidate.
    @Override
    public KStream<String, OutputType> apply(KStream<String, InputType> input) {
        return null;
    }
}

现在我们已经准备就绪,我们可以为每个租户注册一个 bean。我们在 ApplicationContextInitializer 中执行此操作,该 ApplicationContextInitializer 使用工厂方法创建 bean 定义并迭代我们将在配置文件 application.yaml.

中定义的 functions
public class StreamProcessorInitializer 
    implements ApplicationContextInitializer<GenericWebApplicationContext> {

    @Override
    public void initialize(GenericWebApplicationContext context) {
        String functionDefinitions = context.getEnvironment()
            .getProperty("spring.cloud.function.definition");
        String splitter = context.getEnvironment()
            .getProperty("spring.cloud.function.definition.splitter");
        String factoryName = CaseFormat.UPPER_CAMEL.
            .to(CaseFormat.LOWER_CAMEL, MyFunctionDefinitionFactory.class.getSimpleName());
        String factoryMethodName =
             MyFunctionDefinitionFactory.class.getMethods()[0].getName();

        AnnotatedGenericBeanDefinition def = 
            new AnnotatedGenericBeanDefinition(Myfunctiondefinition.class);
        def.setFactoryBeanName(factoryName);
        def.setFactoryMethodName(factoryMethodName);

        Arrays.stream(functionDefinitions.split(splitter))
           .forEach(function -> context.registerBeanDefinition(function, def));
    }
}

终于可以在application.yaml中动态定义函数了。这个可以通过helm oder kustomize来配置具体的租户环境:

#--------------------------------------------------------------------------------------------------------------------------------------
# streaming processor functions (going to be filled by helm)
#--------------------------------------------------------------------------------------------------------------------------------------
spring.cloud.function.definition: <name1>,<name2>,...
#--Note-- required as spring cloud streams has changed the splitter in the past
spring.cloud.function.definition.splitter: ;

# Properties per function (<name>)
spring.cloud.stream.kafka.streams.binder.functions.<name>.applicationId: ${tenant}-${spring.application.name}-<name>
# configuring dlq (if you have one)
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.<name>-in-0.consumer.dlqName: ${tenant}-<name>-dlq
# configuring in- and output topics
spring.cloud.stream.bindings.<name>-in-0.destination: ${tenant}-<inputname>
spring.cloud.stream.bindings.<name>-out-0.destination: ${tenant}-<outputname>