Spring 云流转换表达式语法

Spring cloud stream transform expression syntax

我正在使用本地 Spring Cloud Dataflow 服务器和 shell 试验 Spring Cloud Stream。我得到了像这样的简单示例

dataflow:>stream create --name test --definition "time --trigger.time-unit=SECONDS | log" --deploy

工作正常,因为我每秒在日志中看到一条消息。

2017-08-09 12:51:30,602  INFO -kafka-listener-1 log-sink:202 - 08/09/17 12:51:30
2017-08-09 12:51:31,603  INFO -kafka-listener-1 log-sink:202 - 08/09/17 12:51:31
2017-08-09 12:51:32,605  INFO -kafka-listener-1 log-sink:202 - 08/09/17 12:51:32
.... more log msgs ....

现在我正在尝试扩展此示例以插入转换。我想每秒在日志中查看一次 'hello world',而不是时间,但我不知道如何正确指定表达式。

我试过这个:

dataflow:>stream create --name test --definition "time --trigger.time-unit=SECONDS | transform --transformer.expression='hello world' | log" --deploy

和这个(注意添加'#{}'):

dataflow:>stream create --name test --definition "time --trigger.time-unit=SECONDS | transform --transformer.expression=#{'hello world'} | log" --deploy

但我在转换日志中继续收到错误消息:

Failed to convert property value of type 'java.lang.String' 
  to required type 'org.springframework.expression.Expression' for 
  property 'expression'; nested exception is 
  org.springframework.core.convert.ConverterNotFoundException: No converter 
  found capable of converting from type [java.lang.String] to type 
  [org.springframework.expression.Expression]

我已阅读 Spring Expression Language docs (which I'm not finding helpful since the examples are all in terms of Java code). I've also looked at some of the Spring Cloud Stream test code 中的示例。

我错过了什么?

一旦我 'hello world' 开始工作,我也想从本地数据流服务器的 application.yml 文件中回显一个 属性;欢迎提出建议!


更新:我遵循了 quick-start 1.3.0.M1 里程碑的文档。这些应用程序加载

dataflow:>app import --uri http://bit-ly/Bacon-RELEASE-stream-applications-kafka-10-maven 

正如记录的那样。 (请注意,出于此 post 的目的,将 bit.ly 替换为 bit-ly,因为 Whosebug 不喜欢 bit.ly URLs。)当我点击 bit.ly URL 直接在我的浏览器中下载文件,我看到了这个:

source.file=maven://org.springframework.cloud.stream.app:file-source-kafka-10:1.2.0.RELEASE
source.file.metadata=maven://org.springframework.cloud.stream.app:file-source-kafka-10:jar:metadata:1.2.0.RELEASE
source.ftp=maven://org.springframework.cloud.stream.app:ftp-source-kafka-10:1.2.0.RELEASE
....

我应该使用什么 URL 来下载 1.3.0.M1 应用程序? 1.2.0 应用程序似乎无法与 1.3.0.M1 服务器一起使用是个问题吗?

看起来您在 SCDF 和所有那些开箱即用的应用程序中使用了一些旧版本。

所有应用程序在 transform:

中都有类似的内容
@EnableBinding(Processor.class)
@EnableConfigurationProperties(TransformProcessorProperties.class)
public class TransformProcessorConfiguration {

其中 @EnableBinding 来自 Spring Cloud Stream,其中:

@Configuration
@Import({ BindingServiceConfiguration.class, BindingBeansRegistrar.class, BinderFactoryConfiguration.class,
        SpelExpressionConverterConfiguration.class })
@EnableIntegration
public @interface EnableBinding {

关注SpelExpressionConverterConfiguration。正是这个负责将字符串转换为 Expression 属性。

因此,请务必使用最新的 Bacon SCSt 应用程序:http://docs.spring.io/spring-cloud-stream-app-starters/docs/Bacon.RELEASE/reference/html/

the documentation 'White space and quote rules'

要从 shell 中插入文字表达式,您需要“'hello'\”或“'hello'”。