网关未设置 replyChannel header
Gateway not setting the replyChannel header
我目前正在开发一个使用 Spring Integration 4.3.14 构建的项目,我们决定尝试使用 DSL,但我在尝试集成不同的子流程时遇到了问题。
我定义了以下 IntegrationFlow:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel"))
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomething(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.channel("nullChannel")
.get();
}
所有 transform
和 handle
调用的方法都是 non-void 和 return non-null 值。我们采用这种方法的主要原因是有两个不同的通道来处理错误,具体取决于错误发生的流程部分,因此我们可以采取相应的行动。
然而,当我尝试 运行 这段代码并在数据库中插入一条记录并且轮询器拾取它时,它永远不会超出第一个网关。我只有这个日志行:
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@1863292e
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.Transformer : Performing transformation.
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.AnotherTransformer : Performing another transformation.
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@433a796
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
很明显消息确实到达了第一个网关,但显然没有传递到第二个网关。
在启动过程中,我看到 SI 创建了两个子流(#0 和 #1)和两个通道(我猜是每个操作一个),每个通道有 1 个订阅者。
我还尝试将定义更改为以下内容:
@Bean
public IntegrationFlow getRecords() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel")
.replyChannel("doThingsChannel"))
.get();
}
@Bean
public IntegrationFlow doThings() {
return IntegrationFlows
.from(
"doThingsChannel")
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomehting(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.get();
}
但最终遇到了同样的问题,都在 GatewayEndpointSpec
上设置 replyChannel
或在网关后向 getRecords
流添加显式 .channel
。
我刚刚在 Spring 集成 Java DSL 项目中完成了这个测试用例:
@Test
public void testGateways() {
IntegrationFlow flow = f -> f
.gateway(sf -> sf
.transform(p -> "foo#" + p)
.transform(p -> "bar#" + p))
.gateway(sf -> sf
.handle((p, h) -> "handle1:" + p)
.handle((p, h) -> "handle2:" + p))
.handle(System.out::println);
IntegrationFlowRegistration flowRegistration = this.integrationFlowContext.registration(flow).register();
flowRegistration.getInputChannel()
.send(new GenericMessage<>("test"));
flowRegistration.destroy();
}
我的输出是这样的:
GenericMessage [payload=handle2:handle1:bar#foo#test, headers={id=ae09df5c-f63e-4b68-d73c-29b85f3689a8, timestamp=1528314852110}]
因此,两个网关都按预期工作,并且应用了所有转换器和处理程序。再加上最后一个网关的结果被轮询到最后 System.out
步骤的主流。
不确定你的情况是怎么回事:只是觉得你的 .transform(AnotherTransformer::transform)
没有 return 价值或那里发生了其他任何事情。
关于 replyChannel
选项。它不是将网关的结果发送到哪里。这是等待回复 return:
的地方
/**
* Specify the channel from which reply messages will be received; overrides the
* encompassing gateway's default reply channel.
* @return the channel name.
*/
String replyChannel() default "";
我目前正在开发一个使用 Spring Integration 4.3.14 构建的项目,我们决定尝试使用 DSL,但我在尝试集成不同的子流程时遇到了问题。
我定义了以下 IntegrationFlow:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel"))
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomething(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.channel("nullChannel")
.get();
}
所有 transform
和 handle
调用的方法都是 non-void 和 return non-null 值。我们采用这种方法的主要原因是有两个不同的通道来处理错误,具体取决于错误发生的流程部分,因此我们可以采取相应的行动。
然而,当我尝试 运行 这段代码并在数据库中插入一条记录并且轮询器拾取它时,它永远不会超出第一个网关。我只有这个日志行:
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@1863292e
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.Transformer : Performing transformation.
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.AnotherTransformer : Performing another transformation.
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@433a796
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
很明显消息确实到达了第一个网关,但显然没有传递到第二个网关。
在启动过程中,我看到 SI 创建了两个子流(#0 和 #1)和两个通道(我猜是每个操作一个),每个通道有 1 个订阅者。
我还尝试将定义更改为以下内容:
@Bean
public IntegrationFlow getRecords() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel")
.replyChannel("doThingsChannel"))
.get();
}
@Bean
public IntegrationFlow doThings() {
return IntegrationFlows
.from(
"doThingsChannel")
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomehting(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.get();
}
但最终遇到了同样的问题,都在 GatewayEndpointSpec
上设置 replyChannel
或在网关后向 getRecords
流添加显式 .channel
。
我刚刚在 Spring 集成 Java DSL 项目中完成了这个测试用例:
@Test
public void testGateways() {
IntegrationFlow flow = f -> f
.gateway(sf -> sf
.transform(p -> "foo#" + p)
.transform(p -> "bar#" + p))
.gateway(sf -> sf
.handle((p, h) -> "handle1:" + p)
.handle((p, h) -> "handle2:" + p))
.handle(System.out::println);
IntegrationFlowRegistration flowRegistration = this.integrationFlowContext.registration(flow).register();
flowRegistration.getInputChannel()
.send(new GenericMessage<>("test"));
flowRegistration.destroy();
}
我的输出是这样的:
GenericMessage [payload=handle2:handle1:bar#foo#test, headers={id=ae09df5c-f63e-4b68-d73c-29b85f3689a8, timestamp=1528314852110}]
因此,两个网关都按预期工作,并且应用了所有转换器和处理程序。再加上最后一个网关的结果被轮询到最后 System.out
步骤的主流。
不确定你的情况是怎么回事:只是觉得你的 .transform(AnotherTransformer::transform)
没有 return 价值或那里发生了其他任何事情。
关于 replyChannel
选项。它不是将网关的结果发送到哪里。这是等待回复 return:
/**
* Specify the channel from which reply messages will be received; overrides the
* encompassing gateway's default reply channel.
* @return the channel name.
*/
String replyChannel() default "";