Spring-作为 Spring-Cloud-Data-Flow 流的一部分部署的 Cloud-Function 应用程序的 Kafka 主题名称错误
Wrong Kafka topic names for Spring-Cloud-Function app deployed as part of Spring-Cloud-Data-Flow stream
我有一个简单的 SCDF 流,如下所示:
http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp
mvmn-transform 是一个简单的自定义转换器,如下所示:
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
public static void main(String args[]) {
SpringApplication.run(ScdfTestTransformer.class, args);
}
@Autowired
protected ScdfTestTransformerProperties config;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(Message<?> message) {
Object payload = message.getPayload();
Map<String, Object> result = new HashMap<>();
Map<String, String> headersStr = new HashMap<>();
message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));
result.put("headers", headersStr);
result.put("payload", payload);
result.put("configProp", config.getSomeConfigProp());
return result;
}
// See
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
这很好用。
但我读到 Spring Cloud Function 应该允许我实现此类应用程序而无需指定绑定和转换器注释,因此我将其更改为:
@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
public static void main(String args[]) {
SpringApplication.run(ScdfTestTransformer.class, args);
}
@Autowired
protected ScdfTestTransformerProperties config;
// @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
@Bean
public Function<Message<?>, Map<String, Object>> transform(
// Message<?> message
) {
return message -> {
Object payload = message.getPayload();
Map<String, Object> result = new HashMap<>();
Map<String, String> headersStr = new HashMap<>();
message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));
result.put("headers", headersStr);
result.put("payload", payload);
result.put("configProp", "Config prop val: " + config.getSomeConfigProp());
return result;
};
}
// See
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
现在我遇到了一个问题 - Spring-Cloud-Function 显然忽略了 SCDF 源和目标主题名称,而是创建了主题 transform-in-0
和 transform-out-0
。
SCDF 创建名称类似于 <stream-name>.<app-name>
的主题,例如 TestStream123.http
和 TestStream123.mvmn-transform
以前它们被用于变压器——它应该是,因为它是 SCDF 流的一部分。但现在它们被 Spring-Cloud-Function 忽略,而是创建了 transform-in-0
和 transform-out-0
。
因此我的转换器不再接收任何输入,因为它在错误的 Kafka 主题上期望它。并且可能也不会对流产生任何输出,因为它也会输出到错误的 Kafka 主题。
P.S。以防万一,GitHub 上的完整项目代码:https://github.com/mvmn/scdftest-transformer/tree/scfunc
为了 运行 在本地启动 Kafka、Skipper、SCDF 和 SCDF 控制台,在应用程序文件夹中执行 mvn clean install
,然后在控制台中执行 app register --name mvmn-transform-1 --type processor --uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT --metadata-uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT
。然后您可以使用定义 http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp
部署流
由于您使用的是编写Spring Cloud Stream 应用程序的功能模型,因此在部署此应用程序时,您需要在自定义处理器上传递两个属性以恢复Spring Cloud Data Flow行为。
spring.cloud.stream.function.bindings.transform-in-0=input
spring.cloud.stream.function.bindings.transform-out-0=output
你能试试看有什么不同吗?
我有一个简单的 SCDF 流,如下所示:
http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp
mvmn-transform 是一个简单的自定义转换器,如下所示:
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
public static void main(String args[]) {
SpringApplication.run(ScdfTestTransformer.class, args);
}
@Autowired
protected ScdfTestTransformerProperties config;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(Message<?> message) {
Object payload = message.getPayload();
Map<String, Object> result = new HashMap<>();
Map<String, String> headersStr = new HashMap<>();
message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));
result.put("headers", headersStr);
result.put("payload", payload);
result.put("configProp", config.getSomeConfigProp());
return result;
}
// See
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
这很好用。
但我读到 Spring Cloud Function 应该允许我实现此类应用程序而无需指定绑定和转换器注释,因此我将其更改为:
@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
public static void main(String args[]) {
SpringApplication.run(ScdfTestTransformer.class, args);
}
@Autowired
protected ScdfTestTransformerProperties config;
// @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
@Bean
public Function<Message<?>, Map<String, Object>> transform(
// Message<?> message
) {
return message -> {
Object payload = message.getPayload();
Map<String, Object> result = new HashMap<>();
Map<String, String> headersStr = new HashMap<>();
message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));
result.put("headers", headersStr);
result.put("payload", payload);
result.put("configProp", "Config prop val: " + config.getSomeConfigProp());
return result;
};
}
// See
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
现在我遇到了一个问题 - Spring-Cloud-Function 显然忽略了 SCDF 源和目标主题名称,而是创建了主题 transform-in-0
和 transform-out-0
。
SCDF 创建名称类似于 <stream-name>.<app-name>
的主题,例如 TestStream123.http
和 TestStream123.mvmn-transform
以前它们被用于变压器——它应该是,因为它是 SCDF 流的一部分。但现在它们被 Spring-Cloud-Function 忽略,而是创建了 transform-in-0
和 transform-out-0
。
因此我的转换器不再接收任何输入,因为它在错误的 Kafka 主题上期望它。并且可能也不会对流产生任何输出,因为它也会输出到错误的 Kafka 主题。
P.S。以防万一,GitHub 上的完整项目代码:https://github.com/mvmn/scdftest-transformer/tree/scfunc
为了 运行 在本地启动 Kafka、Skipper、SCDF 和 SCDF 控制台,在应用程序文件夹中执行 mvn clean install
,然后在控制台中执行 app register --name mvmn-transform-1 --type processor --uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT --metadata-uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT
。然后您可以使用定义 http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp
由于您使用的是编写Spring Cloud Stream 应用程序的功能模型,因此在部署此应用程序时,您需要在自定义处理器上传递两个属性以恢复Spring Cloud Data Flow行为。
spring.cloud.stream.function.bindings.transform-in-0=input
spring.cloud.stream.function.bindings.transform-out-0=output
你能试试看有什么不同吗?