了解 Spring 云流内容类型
Understanding Spring Cloud Stream content types
我写了一个类似于 的自定义处理器。特别是,处理器采用 InputDto 和 returns json。按照问答中的指导,我的自定义处理器有一个包含以下内容的 application.properties 文件:
spring.cloud.stream.bindings.input.content-type=application/x-java-object;com.company.InputDto
spring.cloud.stream.bindings.output.content-type=application/json
从 我用这一行创建了一个 spring.integration.properties
文件:
spring.integration.readOnly.headers=contentType
而且我正在进行自动化集成测试。到目前为止一切顺利。
我在 SCDF shell 中创建了一个流,包括我的处理器。 time
和 httpclient
运行良好,所以我没有显示它们的详细参数。为了便于阅读,在此处和整个过程中添加了换行符。
stream create --name test --definition "time <args>
| httpclient <args>
| splitter --expression=\"#jsonPath(payload,'$..[0:]')\"
| myprocessor
| log"
我启用了调试日志记录。 httpclient
生成具有 contentType=text/plain
和有效负载的消息:
payload=[{"name":"first","url":"url1"},{"name":"second","url":"url2"}]
splitter
根据日志(如预期)创建两条这样的消息:
message: GenericMessage [payload={name=first, url=url1},
headers={...contentType=text/plain, ... }]
message: GenericMessage [payload={name=second, url=url2},
headers={...contentType=text/plain, ... }]
而我编写的自定义处理器因以下异常而失败:
org.springframework.messaging.converter.MessageConversionException: Cannot
convert from [java.util.LinkedHashMap] to [com.company.InputDto] for
GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1,
kafka_offset=xx, sequenceSize=x, correlationId=yy, id=zzz,
kafka_receivedPartitionId=0, contentType=text/plain,
kafka_receivedTopic=test.splitter, timestamp=1503591622851}]
at
org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.10.RELEASE.jar!/:4.3.10.RELEASE]
....
我不确定 LinkedHashMap
来自哪里。我尝试将 application.properties 文件更改为:
spring.cloud.stream.bindings.input.content-type=application/json;com.company.InputDto
但没有帮助。我也尝试添加
--spring.cloud.stream.bindings.output.contentType='application/json'
创建流时splitter
(按照sample app中的说明),但仍然出现异常。
我花了好几个小时,就是看不出我错过了什么。感谢任何帮助。
我的自定义处理器使用 Spring Cloud Dalston.SR3。我正在使用 SCDF 服务器和 shell 1.3.0.M1。使用 Kafka 活页夹。
更新,更多信息。我查看了 Splitter starter 中的代码并编写了一个小测试用例来模拟它在做什么。
final ExpressionEvaluatingSplitter splitter = new ExpressionEvaluatingSplitter(
new SpelExpressionParser().parseExpression(
"T(com.jayway.jsonpath.JsonPath).read(payload, '$..[0:]')"));
final PollableChannel channel = new QueueChannel();
splitter.setOutputChannel(channel);
splitter.handleMessage(new GenericMessage<Object>(
"[{\"name\":\"first\",\"url\":\"url1\"},
{\"name\":\"second\",\"url\":\"url2\"}]"));
final Message<?> message = channel.receive(10);
System.out.println("payload class: " + message.getPayload().getClass());
System.out.println("payload: " + message.getPayload());
System.out.println(channel.receive(10));
这会产生输出:
payload class: class java.util.LinkedHashMap
payload: {name=first, url=url1}
GenericMessage [payload={name=second, url=url2}, headers={sequenceNumber=2,
correlationId=xx, id=yy, sequenceSize=2, timestamp=1503609649518}]
啊哈,LinkedHashMap
!现在我只需要说服拆分器将输出作为纯文本或 json,而不是地图发送。
更新 2。我已经能够在不使用任何自定义处理器的情况下重现此问题。
stream create --name test --definition "time <args>
| httpclient <args>
| splitter --expression=\"#jsonPath(payload,'$.[0:]')\"
--outputType=\"application/json\"
| transform --expression=\"new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(payload) != null\"
--outputType=\"application/json\"
| log"
当 运行 时,拆分器日志文件包含此异常(删节),臭名昭著的 "payload must not be null" 错误:
2017-08-25 13:09:30,322 DEBUG -kafka-listener-2 o.s.i.c.DirectChannel:411 - preSend on channel 'output', message: GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1, kafka_offset=xx sequenceSize=2, correlationId=yy, id=zz, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTopic=test.httpclient, timestamp=1503680970322}]
2017-08-25 13:09:30,328 ERROR -kafka-listener-2 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = test.httpclient, partition = 0, offset = 52606, CreateTime = 1503680967030, checksum = 2166925613, serialized key size = -1, serialized value size = 470, key = null, value = [B@6567451e)
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-.3.8.RELEASE.jar!/:4.3.8.RELEASE]
看起来拆分器在将 LinkedHashMap 转换为 JSON 时遇到困难。有什么方法可以实现转换?
我还尝试将 httpclient 处理器的 outputType 设置为显式 application/json
,但似乎没有什么不同。 (在 docs 之后。示例显示带有 shell 引号的 outputType 值,我也尝试过不带引号,没有区别。)
应用程序使用命令加载到 SCDF 服务器(替换为“.”,所以 SO 会接受 link)
app import --uri http://bit-ly/Bacon-RELEASE-stream-applications-kafka-10-maven
又注意到一些事情。调试日志显示内容类型始终为 text/plain
,因此它似乎没有选择我明确设置的内容类型。此外,如果我删除 transform
处理器,错误就会消失。我在日志中看到数据,但不是 JSON 格式,只是这样:
{name=first, url=url1}
您能否像您所说的那样尝试设置拆分器的输出内容类型,并删除输入处理器的任何 contentType 定义。 JSON->POJO 应该是从消息头中自动生成的。
通过显式设置 inputType 和 outputType,我能够使它同时适用于纯 Spring 流和包含我的自定义处理器的流。
stream create --name test --definition "time
| httpclient <args> --outputType=\"application/json\"
| splitter --expression=\"#jsonPath(payload,'$.[0:]')\"
--inputType=\"application/json\"
--outputType=\"application/json\"
| myprocessor
--inputType=\"application/json\"
--outputType=\"application/json\"
| log"
Gary Russell 提到此问题已在 Spring Cloud Stream Ditmars.M2 (1.3.0.M2) 中得到纠正。
我写了一个类似于
spring.cloud.stream.bindings.input.content-type=application/x-java-object;com.company.InputDto
spring.cloud.stream.bindings.output.content-type=application/json
从 spring.integration.properties
文件:
spring.integration.readOnly.headers=contentType
而且我正在进行自动化集成测试。到目前为止一切顺利。
我在 SCDF shell 中创建了一个流,包括我的处理器。 time
和 httpclient
运行良好,所以我没有显示它们的详细参数。为了便于阅读,在此处和整个过程中添加了换行符。
stream create --name test --definition "time <args>
| httpclient <args>
| splitter --expression=\"#jsonPath(payload,'$..[0:]')\"
| myprocessor
| log"
我启用了调试日志记录。 httpclient
生成具有 contentType=text/plain
和有效负载的消息:
payload=[{"name":"first","url":"url1"},{"name":"second","url":"url2"}]
splitter
根据日志(如预期)创建两条这样的消息:
message: GenericMessage [payload={name=first, url=url1},
headers={...contentType=text/plain, ... }]
message: GenericMessage [payload={name=second, url=url2},
headers={...contentType=text/plain, ... }]
而我编写的自定义处理器因以下异常而失败:
org.springframework.messaging.converter.MessageConversionException: Cannot
convert from [java.util.LinkedHashMap] to [com.company.InputDto] for
GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1,
kafka_offset=xx, sequenceSize=x, correlationId=yy, id=zzz,
kafka_receivedPartitionId=0, contentType=text/plain,
kafka_receivedTopic=test.splitter, timestamp=1503591622851}]
at
org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.10.RELEASE.jar!/:4.3.10.RELEASE]
....
我不确定 LinkedHashMap
来自哪里。我尝试将 application.properties 文件更改为:
spring.cloud.stream.bindings.input.content-type=application/json;com.company.InputDto
但没有帮助。我也尝试添加
--spring.cloud.stream.bindings.output.contentType='application/json'
创建流时splitter
(按照sample app中的说明),但仍然出现异常。
我花了好几个小时,就是看不出我错过了什么。感谢任何帮助。
我的自定义处理器使用 Spring Cloud Dalston.SR3。我正在使用 SCDF 服务器和 shell 1.3.0.M1。使用 Kafka 活页夹。
更新,更多信息。我查看了 Splitter starter 中的代码并编写了一个小测试用例来模拟它在做什么。
final ExpressionEvaluatingSplitter splitter = new ExpressionEvaluatingSplitter(
new SpelExpressionParser().parseExpression(
"T(com.jayway.jsonpath.JsonPath).read(payload, '$..[0:]')"));
final PollableChannel channel = new QueueChannel();
splitter.setOutputChannel(channel);
splitter.handleMessage(new GenericMessage<Object>(
"[{\"name\":\"first\",\"url\":\"url1\"},
{\"name\":\"second\",\"url\":\"url2\"}]"));
final Message<?> message = channel.receive(10);
System.out.println("payload class: " + message.getPayload().getClass());
System.out.println("payload: " + message.getPayload());
System.out.println(channel.receive(10));
这会产生输出:
payload class: class java.util.LinkedHashMap
payload: {name=first, url=url1}
GenericMessage [payload={name=second, url=url2}, headers={sequenceNumber=2,
correlationId=xx, id=yy, sequenceSize=2, timestamp=1503609649518}]
啊哈,LinkedHashMap
!现在我只需要说服拆分器将输出作为纯文本或 json,而不是地图发送。
更新 2。我已经能够在不使用任何自定义处理器的情况下重现此问题。
stream create --name test --definition "time <args>
| httpclient <args>
| splitter --expression=\"#jsonPath(payload,'$.[0:]')\"
--outputType=\"application/json\"
| transform --expression=\"new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(payload) != null\"
--outputType=\"application/json\"
| log"
当 运行 时,拆分器日志文件包含此异常(删节),臭名昭著的 "payload must not be null" 错误:
2017-08-25 13:09:30,322 DEBUG -kafka-listener-2 o.s.i.c.DirectChannel:411 - preSend on channel 'output', message: GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1, kafka_offset=xx sequenceSize=2, correlationId=yy, id=zz, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTopic=test.httpclient, timestamp=1503680970322}]
2017-08-25 13:09:30,328 ERROR -kafka-listener-2 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = test.httpclient, partition = 0, offset = 52606, CreateTime = 1503680967030, checksum = 2166925613, serialized key size = -1, serialized value size = 470, key = null, value = [B@6567451e)
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-.3.8.RELEASE.jar!/:4.3.8.RELEASE]
看起来拆分器在将 LinkedHashMap 转换为 JSON 时遇到困难。有什么方法可以实现转换?
我还尝试将 httpclient 处理器的 outputType 设置为显式 application/json
,但似乎没有什么不同。 (在 docs 之后。示例显示带有 shell 引号的 outputType 值,我也尝试过不带引号,没有区别。)
应用程序使用命令加载到 SCDF 服务器(替换为“.”,所以 SO 会接受 link)
app import --uri http://bit-ly/Bacon-RELEASE-stream-applications-kafka-10-maven
又注意到一些事情。调试日志显示内容类型始终为 text/plain
,因此它似乎没有选择我明确设置的内容类型。此外,如果我删除 transform
处理器,错误就会消失。我在日志中看到数据,但不是 JSON 格式,只是这样:
{name=first, url=url1}
您能否像您所说的那样尝试设置拆分器的输出内容类型,并删除输入处理器的任何 contentType 定义。 JSON->POJO 应该是从消息头中自动生成的。
通过显式设置 inputType 和 outputType,我能够使它同时适用于纯 Spring 流和包含我的自定义处理器的流。
stream create --name test --definition "time
| httpclient <args> --outputType=\"application/json\"
| splitter --expression=\"#jsonPath(payload,'$.[0:]')\"
--inputType=\"application/json\"
--outputType=\"application/json\"
| myprocessor
--inputType=\"application/json\"
--outputType=\"application/json\"
| log"
Gary Russell 提到此问题已在 Spring Cloud Stream Ditmars.M2 (1.3.0.M2) 中得到纠正。