Spring XD 在一个模块中转换和路由消息
Spring XD transform and route message in one module
如何在 1 个模块中转换和路由消息。
我创建了一个这样的模块:
xd-ta-core.xml
<int:channel id="input"/>
<int:channel id="output"/>
<beans:bean id="taCore" class="com.company.threatanalyzer.xd.plugin.XdTaCore">
<constructor-arg value="${inputLogsType}"/>
<constructor-arg value="${zookeeperHost}"/>
<property name="persistChannel" value="${persistChannel}"/>
<property name="correlateChannel" value="${correlateChannel}"/>
</beans:bean>
<int:transformer input-channel="input" output-channel="output" ref="taCore" method="transform"/>
<int:router input-channel="output" ref="taCore" method="route"/>
XdTaCore.java
public Event transform(String payload) {
Event event = tupleDeserializer.fromJson(payload);
event.setId(UUID.randomUUID().toString());
logger.warn("=======TA Core Transform======= {} ===========", event.getId());
return event;
}
public List<String> route(Event payload) {
List<String> outChannels = new ArrayList<>();
List<PolicyAction> policyActions = policyEnforcer.enforce(payload);
if (policyActions.contains(PolicyAction.PERSIST)) {
outChannels.add(persistChannel);
}
if (policyActions.contains(PolicyAction.CORRELATE)) {
outChannels.add(correlateChannel);
}
logger.warn("=======TA Core Route======= {} ========== {} ===============", payload.getId(), policyActions);
return outChannels;
}
但在日志中我是这样的:
2016-09-10T14:35:26+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= dc6209e2-79ff-4b88-a245-4b12a603058c ========== {}
2016-09-10T14:35:26+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Route======= dc6209e2-79ff-4b88-a245-4b12a603058c ========== [CORRELATE, PERSIST] ===========
2016-09-10T14:35:26+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= fabcf574-32b1-4510-96c5-8be44ae8b757 ========== {}
2016-09-10T14:35:27+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= 4f07e5e8-7f0e-4e48-a6a0-813e57dd081c ========== {}
2016-09-10T14:35:27+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Route======= 4f07e5e8-7f0e-4e48-a6a0-813e57dd081c ========== [CORRELATE, PERSIST] ===========
2016-09-10T14:35:27+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= 6393be33-6188-405d-b0b3-8e9f87ec4af0 ========== {}
2016-09-10T14:35:28+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= 58b17358-58b8-46f3-894a-b7b5811564bc ========== {}
2016-09-10T14:35:28+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Route======= 58b17358-58b8-46f3-894a-b7b5811564bc ========== [CORRELATE, PERSIST] ===========
2016-09-10T14:35:28+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= e92e153d-8f39-4ae5-9c82-14a7f141aef5 ========== {}
如您所见,每条消息的转换函数 运行s 但其中一半的路由函数仅为 运行。
另一个问题是哪个更适合性能:
- 一个模块做10行代码
- 5个模块,每个模块做2行代码
频道 output
有两个订阅者 - 消息总线和您的路由器。在这种情况下,消息将交替发送给每个人。
您需要在变压器和路由器之间使用第三个中间通道(例如 toRouter
)...
<int:transformer input-channel="input" output-channel="toRouter" ref="taCore" method="transform"/>
<int:router input-channel="toRouter" ref="taCore" method="route"/>
变压器的输出通道就是路由器的输入通道。
And another question is which one is better for performance:
视情况而定;对于像这样的简单组件,最好使用单个模块。对于更复杂的场景,您可能希望独立扩展组件,单独的模块可能更好。
如何在 1 个模块中转换和路由消息。
我创建了一个这样的模块:
xd-ta-core.xml
<int:channel id="input"/>
<int:channel id="output"/>
<beans:bean id="taCore" class="com.company.threatanalyzer.xd.plugin.XdTaCore">
<constructor-arg value="${inputLogsType}"/>
<constructor-arg value="${zookeeperHost}"/>
<property name="persistChannel" value="${persistChannel}"/>
<property name="correlateChannel" value="${correlateChannel}"/>
</beans:bean>
<int:transformer input-channel="input" output-channel="output" ref="taCore" method="transform"/>
<int:router input-channel="output" ref="taCore" method="route"/>
XdTaCore.java
public Event transform(String payload) {
Event event = tupleDeserializer.fromJson(payload);
event.setId(UUID.randomUUID().toString());
logger.warn("=======TA Core Transform======= {} ===========", event.getId());
return event;
}
public List<String> route(Event payload) {
List<String> outChannels = new ArrayList<>();
List<PolicyAction> policyActions = policyEnforcer.enforce(payload);
if (policyActions.contains(PolicyAction.PERSIST)) {
outChannels.add(persistChannel);
}
if (policyActions.contains(PolicyAction.CORRELATE)) {
outChannels.add(correlateChannel);
}
logger.warn("=======TA Core Route======= {} ========== {} ===============", payload.getId(), policyActions);
return outChannels;
}
但在日志中我是这样的:
2016-09-10T14:35:26+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= dc6209e2-79ff-4b88-a245-4b12a603058c ========== {}
2016-09-10T14:35:26+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Route======= dc6209e2-79ff-4b88-a245-4b12a603058c ========== [CORRELATE, PERSIST] ===========
2016-09-10T14:35:26+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= fabcf574-32b1-4510-96c5-8be44ae8b757 ========== {}
2016-09-10T14:35:27+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= 4f07e5e8-7f0e-4e48-a6a0-813e57dd081c ========== {}
2016-09-10T14:35:27+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Route======= 4f07e5e8-7f0e-4e48-a6a0-813e57dd081c ========== [CORRELATE, PERSIST] ===========
2016-09-10T14:35:27+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= 6393be33-6188-405d-b0b3-8e9f87ec4af0 ========== {}
2016-09-10T14:35:28+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= 58b17358-58b8-46f3-894a-b7b5811564bc ========== {}
2016-09-10T14:35:28+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Route======= 58b17358-58b8-46f3-894a-b7b5811564bc ========== [CORRELATE, PERSIST] ===========
2016-09-10T14:35:28+0430 1.3.1.RELEASE WARN pool-27-thread-1 plugin.XdTaCore - =======TA Core Transform======= e92e153d-8f39-4ae5-9c82-14a7f141aef5 ========== {}
如您所见,每条消息的转换函数 运行s 但其中一半的路由函数仅为 运行。
另一个问题是哪个更适合性能:
- 一个模块做10行代码
- 5个模块,每个模块做2行代码
频道 output
有两个订阅者 - 消息总线和您的路由器。在这种情况下,消息将交替发送给每个人。
您需要在变压器和路由器之间使用第三个中间通道(例如 toRouter
)...
<int:transformer input-channel="input" output-channel="toRouter" ref="taCore" method="transform"/>
<int:router input-channel="toRouter" ref="taCore" method="route"/>
变压器的输出通道就是路由器的输入通道。
And another question is which one is better for performance:
视情况而定;对于像这样的简单组件,最好使用单个模块。对于更复杂的场景,您可能希望独立扩展组件,单独的模块可能更好。