无法 运行 只是这个简单的流,...需要 serde 配置吗?
Unable to run just this simple stream,... serde configuration needed?
是的,我已经阅读了我找到的所有文档并尝试了所有替代配置,但是这个应该记录一行的简单示例不起作用
(这是一个带有 spring-cloud-stream-binder-kafka-streams 的 Spring-Boot-2 应用程序)
Kafka 正在存储一个字符串值(空键)
我的apllication.yaml
spring:
cloud:
stream:
bindings:
input:
destination: 'myStreamTopic'
output:
producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
kafka:
streams:
binder:
configuration:
default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
brokers:
- 'ommited:9092'
- 'ommited:9092'
- 'ommited:9092'
application-id: hack1
只是这个简单的代码作为 POC:
@SpringBootApplication
@Slf4j
public class HackatonApplication {
public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}
@EnableBinding(KafkaStreamsProcessor.class)
public static class LineProcessor {
@StreamListener(Sink.INPUT)
public void process(KStream<?, String> line) {
log.info("Received: {}", line);
}
}
我无法得到它运行!
org.springframework.context.ApplicationContextException: 启动bean失败'outputBindingLifecycle';嵌套异常是 java.lang.IllegalArgumentException:尝试调用 public 抽象 org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper) 但未设置委托。
在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
在 org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
在
抱歉,如果这是微不足道的,但我已经花了几个小时搜索、谷歌搜索并试图找到记录在案的解决方案。
您正在使用开箱即用的 KafkaStreamsProcessor
进行绑定,它需要一个 KStream
作为输入,另一个 KStream
作为输出。如果你使用这个标准的,你必须为输出绑定提供正确的配置(比如目的地等)。那么你的方法必须 return 一个 KStream
并且你需要在 spring 端使用 SendTo
注释来绑定。如下所示:
@StreamListener(Sink.INPUT)
@SendTo("output")
public KStream<?,String> process(KStream<?, String> line) {
log.info("Received: {}", line);
return line;
}
但是,对于您的情况,您可以使用自定义处理器并将其用于 EnableBinding
。
interface CustomKafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
}
然后将其与您的绑定一起使用。 @EnableBinding(CustomKafkaStreamsProcessor.class)
。
这样一来,您就不必将方法更改为return。
是的,我已经阅读了我找到的所有文档并尝试了所有替代配置,但是这个应该记录一行的简单示例不起作用
(这是一个带有 spring-cloud-stream-binder-kafka-streams 的 Spring-Boot-2 应用程序)
Kafka 正在存储一个字符串值(空键)
我的apllication.yaml
spring:
cloud:
stream:
bindings:
input:
destination: 'myStreamTopic'
output:
producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
kafka:
streams:
binder:
configuration:
default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
brokers:
- 'ommited:9092'
- 'ommited:9092'
- 'ommited:9092'
application-id: hack1
只是这个简单的代码作为 POC:
@SpringBootApplication
@Slf4j
public class HackatonApplication {
public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}
@EnableBinding(KafkaStreamsProcessor.class)
public static class LineProcessor {
@StreamListener(Sink.INPUT)
public void process(KStream<?, String> line) {
log.info("Received: {}", line);
}
}
我无法得到它运行!
org.springframework.context.ApplicationContextException: 启动bean失败'outputBindingLifecycle';嵌套异常是 java.lang.IllegalArgumentException:尝试调用 public 抽象 org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper) 但未设置委托。 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] 在 org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] 在
抱歉,如果这是微不足道的,但我已经花了几个小时搜索、谷歌搜索并试图找到记录在案的解决方案。
您正在使用开箱即用的 KafkaStreamsProcessor
进行绑定,它需要一个 KStream
作为输入,另一个 KStream
作为输出。如果你使用这个标准的,你必须为输出绑定提供正确的配置(比如目的地等)。那么你的方法必须 return 一个 KStream
并且你需要在 spring 端使用 SendTo
注释来绑定。如下所示:
@StreamListener(Sink.INPUT)
@SendTo("output")
public KStream<?,String> process(KStream<?, String> line) {
log.info("Received: {}", line);
return line;
}
但是,对于您的情况,您可以使用自定义处理器并将其用于 EnableBinding
。
interface CustomKafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
}
然后将其与您的绑定一起使用。 @EnableBinding(CustomKafkaStreamsProcessor.class)
。
这样一来,您就不必将方法更改为return。