了解 Spring 云流内容类型

Understanding Spring Cloud Stream content types

我写了一个类似于 的自定义处理器。特别是,处理器采用 InputDto 和 returns json。按照问答中的指导,我的自定义处理器有一个包含以下内容的 application.properties 文件:

spring.cloud.stream.bindings.input.content-type=application/x-java-object;com.company.InputDto
spring.cloud.stream.bindings.output.content-type=application/json

我用这一行创建了一个 spring.integration.properties 文件:

spring.integration.readOnly.headers=contentType

而且我正在进行自动化集成测试。到目前为止一切顺利。

我在 SCDF shell 中创建了一个流,包括我的处理器。 timehttpclient 运行良好,所以我没有显示它们的详细参数。为了便于阅读,在此处和整个过程中添加了换行符。

stream create --name test --definition "time <args> 
  | httpclient <args> 
  | splitter --expression=\"#jsonPath(payload,'$..[0:]')\" 
  | myprocessor 
  | log"

我启用了调试日志记录。 httpclient 生成具有 contentType=text/plain 和有效负载的消息:

payload=[{"name":"first","url":"url1"},{"name":"second","url":"url2"}]

splitter 根据日志(如预期)创建两条这样的消息:

message: GenericMessage [payload={name=first, url=url1}, 
  headers={...contentType=text/plain, ... }]

message: GenericMessage [payload={name=second, url=url2}, 
  headers={...contentType=text/plain, ... }]

而我编写的自定义处理器因以下异常而失败:

org.springframework.messaging.converter.MessageConversionException: Cannot 
convert from [java.util.LinkedHashMap] to [com.company.InputDto] for 
GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1, 
   kafka_offset=xx, sequenceSize=x, correlationId=yy, id=zzz, 
   kafka_receivedPartitionId=0, contentType=text/plain, 
   kafka_receivedTopic=test.splitter, timestamp=1503591622851}]
    at 
 org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.10.RELEASE.jar!/:4.3.10.RELEASE] 
....

我不确定 LinkedHashMap 来自哪里。我尝试将 application.properties 文件更改为:

spring.cloud.stream.bindings.input.content-type=application/json;com.company.InputDto

但没有帮助。我也尝试添加

--spring.cloud.stream.bindings.output.contentType='application/json'

创建流时splitter(按照sample app中的说明),但仍然出现异常。

我花了好几个小时,就是看不出我错过了什么。感谢任何帮助。

我的自定义处理器使用 Spring Cloud Dalston.SR3。我正在使用 SCDF 服务器和 shell 1.3.0.M1。使用 Kafka 活页夹。


更新,更多信息。我查看了 Splitter starter 中的代码并编写了一个小测试用例来模拟它在做什么。

final ExpressionEvaluatingSplitter splitter = new ExpressionEvaluatingSplitter(
    new SpelExpressionParser().parseExpression(
        "T(com.jayway.jsonpath.JsonPath).read(payload, '$..[0:]')"));
final PollableChannel channel = new QueueChannel();
splitter.setOutputChannel(channel);
splitter.handleMessage(new GenericMessage<Object>(
  "[{\"name\":\"first\",\"url\":\"url1\"},
    {\"name\":\"second\",\"url\":\"url2\"}]"));
final Message<?> message = channel.receive(10);
System.out.println("payload class: " + message.getPayload().getClass());
System.out.println("payload: " + message.getPayload());
System.out.println(channel.receive(10));

这会产生输出:

payload class: class java.util.LinkedHashMap
payload: {name=first, url=url1}
GenericMessage [payload={name=second, url=url2}, headers={sequenceNumber=2,
  correlationId=xx, id=yy, sequenceSize=2, timestamp=1503609649518}]

啊哈,LinkedHashMap!现在我只需要说服拆分器将输出作为纯文本或 json,而不是地图发送。


更新 2。我已经能够在不使用任何自定义处理器的情况下重现此问题。

stream create --name test --definition "time <args>
  | httpclient <args>
  | splitter --expression=\"#jsonPath(payload,'$.[0:]')\" 
         --outputType=\"application/json\"
  | transform --expression=\"new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(payload) != null\" 
         --outputType=\"application/json\"
  | log"

当 运行 时,拆分器日志文件包含此异常(删节),臭名昭著的 "payload must not be null" 错误:

2017-08-25 13:09:30,322 DEBUG -kafka-listener-2 o.s.i.c.DirectChannel:411 - preSend on channel 'output', message: GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1, kafka_offset=xx sequenceSize=2, correlationId=yy, id=zz, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTopic=test.httpclient, timestamp=1503680970322}]
2017-08-25 13:09:30,328 ERROR -kafka-listener-2 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = test.httpclient, partition = 0, offset = 52606, CreateTime = 1503680967030, checksum = 2166925613, serialized key size = -1, serialized value size = 470, key = null, value = [B@6567451e)
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
    at     org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-.3.8.RELEASE.jar!/:4.3.8.RELEASE]

看起来拆分器在将 LinkedHashMap 转换为 JSON 时遇到困难。有什么方法可以实现转换?

我还尝试将 httpclient 处理器的 outputType 设置为显式 application/json,但似乎没有什么不同。 (在 docs 之后。示例显示带有 shell 引号的 outputType 值,我也尝试过不带引号,没有区别。)

应用程序使用命令加载到 SCDF 服务器(替换为“.”,所以 SO 会接受 link)

app import --uri http://bit-ly/Bacon-RELEASE-stream-applications-kafka-10-maven

又注意到一些事情。调试日志显示内容类型始终为 text/plain,因此它似乎没有选择我明确设置的内容类型。此外,如果我删除 transform 处理器,错误就会消失。我在日志中看到数据,但不是 JSON 格式,只是这样:

{name=first, url=url1}

您能否像您所说的那样尝试设置拆分器的输出内容类型,并删除输入处理器的任何 contentType 定义。 JSON->POJO 应该是从消息头中自动生成的。

通过显式设置 inputType 和 outputType,我能够使它同时适用于纯 Spring 流和包含我的自定义处理器的流。

stream create --name test --definition "time 
  | httpclient <args> --outputType=\"application/json\" 
  | splitter --expression=\"#jsonPath(payload,'$.[0:]')\" 
     --inputType=\"application/json\" 
     --outputType=\"application/json\" 
  | myprocessor 
     --inputType=\"application/json\" 
     --outputType=\"application/json\" 
  | log"

Gary​​ Russell 提到此问题已在 Spring Cloud Stream Ditmars.M2 (1.3.0.M2) 中得到纠正。