接收器组件无法在 spring 云数据流中使用 kafka 获取正确的数据
Sink component doesn't get the right data with kafka in spring cloud data flow
我的母语不是英语,但我会尽可能清楚地表达我的问题。
遇到这个问题困扰了我两天了,还是找不到解决办法
我已经构建了一个流,它将 运行 在 Hadoop YARN 中的 Spring 数据流中。
流由Http源、处理器和文件接收器组成。
1.Http 来源
HTTP Source 组件有两个输出通道绑定两个不同的目的地,它们是 application.properties.
中定义的 dest1 和 dest2
spring.cloud.stream.bindings.output.destination=dest1
spring.cloud.stream.bindings.output2.destination=dest2
下面是HTTP源代码片段供大家参考..
@Autowired
private EssSource channels; //EssSource is the interface for multiple output channels
##output channel 1:
@RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest1...");
channels.output().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
##output channel 2:
@RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest2...");
channels.output2().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
2。处理器
该处理器有两个多输入通道和两个与不同目的地绑定的输出通道。
目标绑定在处理器组件项目的 application.properties 中定义。
//input channel binding
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2
//output channel binding
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink
下面是处理器的代码片段。
@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
public Object transform(Message<?> message) {
logger.info("enter ...transform...");
return "processed by transform1";;
}
@Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
public Object transform2(Message<?> message) {
logger.info("enter ... transform2...");
return "processed by transform2";
}
3。文件接收器组件。
我使用 Spring 的官方 fil sink 组件。
maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT
我只是在其 applicaiton.properties 文件中添加目标绑定。
spring.cloud.stream.bindings.input.destination=fileSink
4.Finding:
我期望的数据流应该是这样的:
Source.handleRequest() -->Processor.handleRequest()
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
应该只将字符串 "processed by transform2" 保存到文件中。
但是经过我的测试,数据流是这样的:
Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
"processed by transform1" 和 "processed by transform2" 字符串都保存到文件中。
5.Question:
虽然 Processor.handleRequest() 中输出通道的目标绑定到 hdfsSink 而不是 fileSink,数据仍然流向文件 Sink。我无法理解这一点,这不是我想要的。
我只希望来自 Processor.handleRequest2() 的数据流向文件接收器而不是两者。
如果我做的不对,谁能告诉我该怎么做以及解决方案是什么?
迷惑了我2天
感谢您的热心帮助。
亚历克斯
您的流定义是这样的吗(其中“-2”版本是具有多个频道的版本)?
http-source-2 | processor-2 | file-sink
请注意,Spring 云数据流将覆盖 applications.properties
中定义的目的地,这就是为什么,即使处理器的 spring.cloud.stream.bindings.output.destination
设置为 hdfs-sink
,它实际上会匹配 file-sink
.
的输入
从流定义配置目的地的方式在此处解释(在 taps 的上下文中):http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow-stream-tap-dsl
您可以做的是简单地交换通道 1 和 2 的含义——将侧通道用于 hdfs。虽然这有点脆弱 - 因为 Stream 的 input
/output
通道将自动配置,其他通道将通过 application.properties
配置 - 在这种情况下可能更好通过流定义或在部署时配置侧信道目的地 - 请参阅 http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#_application_properties.
在我看来,这些也可能是 2 个流,使用常规组件监听不同的端点 - 假设数据应该并排流动。
我的母语不是英语,但我会尽可能清楚地表达我的问题。 遇到这个问题困扰了我两天了,还是找不到解决办法
我已经构建了一个流,它将 运行 在 Hadoop YARN 中的 Spring 数据流中。
流由Http源、处理器和文件接收器组成。
1.Http 来源
HTTP Source 组件有两个输出通道绑定两个不同的目的地,它们是 application.properties.
spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2
下面是HTTP源代码片段供大家参考..
@Autowired
private EssSource channels; //EssSource is the interface for multiple output channels
##output channel 1:
@RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest1...");
channels.output().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
##output channel 2:
@RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest2...");
channels.output2().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
2。处理器
该处理器有两个多输入通道和两个与不同目的地绑定的输出通道。
目标绑定在处理器组件项目的 application.properties 中定义。
//input channel binding
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2
//output channel binding
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink
下面是处理器的代码片段。
@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
public Object transform(Message<?> message) {
logger.info("enter ...transform...");
return "processed by transform1";;
}
@Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
public Object transform2(Message<?> message) {
logger.info("enter ... transform2...");
return "processed by transform2";
}
3。文件接收器组件。
我使用 Spring 的官方 fil sink 组件。 maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT
我只是在其 applicaiton.properties 文件中添加目标绑定。 spring.cloud.stream.bindings.input.destination=fileSink
4.Finding:
我期望的数据流应该是这样的:
Source.handleRequest() -->Processor.handleRequest()
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
应该只将字符串 "processed by transform2" 保存到文件中。
但是经过我的测试,数据流是这样的:
Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
"processed by transform1" 和 "processed by transform2" 字符串都保存到文件中。
5.Question:
虽然 Processor.handleRequest() 中输出通道的目标绑定到 hdfsSink 而不是 fileSink,数据仍然流向文件 Sink。我无法理解这一点,这不是我想要的。 我只希望来自 Processor.handleRequest2() 的数据流向文件接收器而不是两者。 如果我做的不对,谁能告诉我该怎么做以及解决方案是什么? 迷惑了我2天
感谢您的热心帮助。
亚历克斯
您的流定义是这样的吗(其中“-2”版本是具有多个频道的版本)?
http-source-2 | processor-2 | file-sink
请注意,Spring 云数据流将覆盖 applications.properties
中定义的目的地,这就是为什么,即使处理器的 spring.cloud.stream.bindings.output.destination
设置为 hdfs-sink
,它实际上会匹配 file-sink
.
从流定义配置目的地的方式在此处解释(在 taps 的上下文中):http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow-stream-tap-dsl
您可以做的是简单地交换通道 1 和 2 的含义——将侧通道用于 hdfs。虽然这有点脆弱 - 因为 Stream 的 input
/output
通道将自动配置,其他通道将通过 application.properties
配置 - 在这种情况下可能更好通过流定义或在部署时配置侧信道目的地 - 请参阅 http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#_application_properties.
在我看来,这些也可能是 2 个流,使用常规组件监听不同的端点 - 假设数据应该并排流动。