关于组合处理器和接收器的问题
questions about composed processor and sink
我正在使用 Spring Could Data Flow 1.7.2.RELEASE 并尝试遵循 this blog post
创建 "a combination of processor + sink into a single application: “a new sink”."
当我按照博客的示例构建我的代码时,我遇到了问题,我认为这是因为博客的示例使用了类似于处理器的 java.util.Function。
我想我应该使用 java.util.Consumer 因为我正在尝试将现有的 Sink 更改为 Processor-Sink hybrid
我的 class 看起来像这样:
@EnableBinding(Sink.class)
public class SampleCombinedSink extends Something {
String modifiedPayload;
Logger log;
Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };
public void accept(String s){
log.info("SampleCombinedSink.accept() s="+s);
}
@StreamListener(Sink.INPUT)
public void doSink(String payload) {
consumer.accept(payload);
log.info("SampleCombinedSink.doSink() Payload received.");
log.info("SampleCombinedSink.doSink() payload="+ payload);
log.info("SampleCombinedSink.doSink() modifiedPayload="+ modifiedPayload);
}
}
我的输出如下所示:
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:53.330+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:53.330+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:54.332+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:54.332+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:55.333+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:55.333+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:56.313+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:56.313+0000]
My Source 每秒发出一个时间戳。
我对我的消费者感到困惑。
Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };
我以为我可以做类似的事情:
Consumer<String> consumer = i -> { i="STUFF ADDED BY CONSUMER i=["+i+"]"; };
然后是
中的负载
@StreamListener(Sink.INPUT)
public void doSink(String payload) {
将包含 "STUFF ADDED BY CONSUMER i=[timestamp]"
没有。
我想将输入更改为 doSink,并通过添加 "STUFF ADDED BY CONSUMER" 进行更改,以便当输入到达 doSink(String payload) 时,有效负载将包含 "STUFF ADDED BY CONSUMER i=[timestamp]"
如何实现?
在这种情况下,您不必更改 Sink
应用程序的任何内容,只需在 Sink
应用程序端使用相同的功能管道即可。
例如,制作:
a combination of processor + sink into a single application: “a new sink”."
您所需要的只是将 function
beans 作为 Sink
应用程序的一部分,或者甚至将 function
beans 放在单独的工件中,但在 classpath
Sink
个应用程序。完成后,您可以为 Sink
应用程序定义 spring.cloud.stream.function.definition
。
您可以查看此 here. The app log-composed
has 函数 bean 定义的示例。
给运行样本:
dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'source:http-transformer'
dataflow:>app register --name log-composed --type sink --uri file:///Users/igopinathan/.m2/repository/org/springframework/cloud/stream/app/log-composed/2.1.0.BUILD-SNAPSHOT/log-composed-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'sink:log-composed'
dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log-composed"
Created new stream 'helloComposed'
dataflow:>stream deploy helloComposed --properties "app.log-composed.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
Deployment request has been sent for stream 'helloComposed'
dataflow:>http post --data "friend" --target "http://localhost:9001"
在 stream deploy
命令中,您可以看到用于指定 function composition
的应用程序是 log-composed
。
我正在使用 Spring Could Data Flow 1.7.2.RELEASE 并尝试遵循 this blog post
创建 "a combination of processor + sink into a single application: “a new sink”."
当我按照博客的示例构建我的代码时,我遇到了问题,我认为这是因为博客的示例使用了类似于处理器的 java.util.Function。
我想我应该使用 java.util.Consumer 因为我正在尝试将现有的 Sink 更改为 Processor-Sink hybrid
我的 class 看起来像这样:
@EnableBinding(Sink.class)
public class SampleCombinedSink extends Something {
String modifiedPayload;
Logger log;
Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };
public void accept(String s){
log.info("SampleCombinedSink.accept() s="+s);
}
@StreamListener(Sink.INPUT)
public void doSink(String payload) {
consumer.accept(payload);
log.info("SampleCombinedSink.doSink() Payload received.");
log.info("SampleCombinedSink.doSink() payload="+ payload);
log.info("SampleCombinedSink.doSink() modifiedPayload="+ modifiedPayload);
}
}
我的输出如下所示:
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:53.330+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:53.330+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:54.332+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:54.332+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:55.333+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:55.333+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:56.313+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:56.313+0000]
My Source 每秒发出一个时间戳。
我对我的消费者感到困惑。
Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };
我以为我可以做类似的事情:
Consumer<String> consumer = i -> { i="STUFF ADDED BY CONSUMER i=["+i+"]"; };
然后是
中的负载@StreamListener(Sink.INPUT)
public void doSink(String payload) {
将包含 "STUFF ADDED BY CONSUMER i=[timestamp]"
没有。
我想将输入更改为 doSink,并通过添加 "STUFF ADDED BY CONSUMER" 进行更改,以便当输入到达 doSink(String payload) 时,有效负载将包含 "STUFF ADDED BY CONSUMER i=[timestamp]"
如何实现?
在这种情况下,您不必更改 Sink
应用程序的任何内容,只需在 Sink
应用程序端使用相同的功能管道即可。
例如,制作:
a combination of processor + sink into a single application: “a new sink”."
您所需要的只是将 function
beans 作为 Sink
应用程序的一部分,或者甚至将 function
beans 放在单独的工件中,但在 classpath
Sink
个应用程序。完成后,您可以为 Sink
应用程序定义 spring.cloud.stream.function.definition
。
您可以查看此 here. The app log-composed
has 函数 bean 定义的示例。
给运行样本:
dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'source:http-transformer'
dataflow:>app register --name log-composed --type sink --uri file:///Users/igopinathan/.m2/repository/org/springframework/cloud/stream/app/log-composed/2.1.0.BUILD-SNAPSHOT/log-composed-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'sink:log-composed'
dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log-composed"
Created new stream 'helloComposed'
dataflow:>stream deploy helloComposed --properties "app.log-composed.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
Deployment request has been sent for stream 'helloComposed'
dataflow:>http post --data "friend" --target "http://localhost:9001"
在 stream deploy
命令中,您可以看到用于指定 function composition
的应用程序是 log-composed
。