无法让聚合器工作
Unable to get Aggregator to work
我正在尝试了解聚合器的基础知识。以下是我要实现的用例:
1) 从队列中读取消息(订单详细信息)。
<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
<orderItem>
<isbn>12333454443</isbn>
<quantity>4</quantity>
</orderItem>
<orderItem>
<isbn>545656777</isbn>
<quantity>50</quantity>
</orderItem>
..
..
</order>
一个 order 消息将包含多个 orderItem。我们可以预期队列中有数百条 order 条消息。
2) 最终结果::
a) 每个 orderitem 都应该写入文件。
b) 4 此类文件应写入唯一文件夹。
举个例子,假设我们有两条 order 消息——每条消息包含三个 orderitem.
所以我们需要创建 2 个文件夹:
在"folder 1"中应该有4个文件(每个文件1个orderitem)
在"folder 2"中应该有2个文件(每个文件1个orderitem)。这里为简单起见,我们假设不再有 order 消息,我们可以在 5 分钟后写入。
实施:
- 我能够从队列 (websphere MQ) 中读取消息并成功解组消息。
- 使用拆分器根据 orderitem 计数拆分消息。
- 使用聚合器将消息分组为 4 个大小。
我无法让聚合器按照我的理解工作。
- 我在 4 orderitem 时推送一个 order,消息正在正确聚合。
- 我用 5 个 orderitem 推送一个 order,前 4 个正在聚合,但最后一个被发送到丢弃通道。这是预期的,因为 MessageGroup 已释放,所以最后一条消息被丢弃。
- 我推送了两个 订单,每个包含 2 个 orderitem。最后2个orderitem发送到discard channel.
关联策略是硬编码的 (OrderAggregator.java),但上述情况应该有效。
需要有关如何实现此用例的指示,我可以将它们分组为 4 个并写入唯一的文件夹。
请注意orderitem都是独立的图书订单,它们之间没有任何关系。
配置如下。
spring-bean.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
<int:channel id="mqInbound"/>
<int:channel id="item"/>
<int:channel id="itemList"/>
<int:channel id="aggregatorDiscardChannel"/>
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="mqInbound"
destination="requestQueue"
message- converter="orderMessageConverter"/>
<int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/>
<int:chain id="aggregateList" input-channel="item" output-channel="itemList" >
<int:header-enricher>
<int:header name="sequenceSize" expression="4" overwrite="true"/>
</int:header-enricher>
<int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" />
</int:chain>
<int:service-activator input-channel="itemList" ref="displayAggregatedList" method="display"/>
<int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>
<bean id="orderAggregator" class="com.samples.Aggregator.OrderAggregator"/>
<bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
...
....
</beans>
OrderAggregator.java
public class OrderAggregator {
@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {
return orderItemTypeList;
}
@CorrelationStrategy
public String groupOrders( OrderItemType orderItemType) {
return "items";
}
}
DisplayAggregatedList.java
public class DisplayAggregatedList {
public void display(List <OrderItemType> orderItemTypeList) {
System.out.println("######## Display Aggregated ##############");
for(OrderItemType oit : orderItemTypeList) {
System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
}
}
public void displayDiscarded(Message<?> message) {
System.out.println("######## Display Discarded ##############" + message);
}
}
你需要的是expire-groups-upon-completion
:
When set to true (default false), completed groups are removed from the message store, allowing subsequent messages with the same correlation to form a new group. The default behavior is to send messages with the same correlation as a completed group to the discard-channel.
如果您无论如何都需要发布未完成的组(例如还剩 2 个订单),请考虑使用 group-timeout
:http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
请使用 expire-groups-upon-completion="true" 并考虑使用 MessageCountReleaseStrategy` 作为发布策略 – Artem Bilan
我正在尝试了解聚合器的基础知识。以下是我要实现的用例:
1) 从队列中读取消息(订单详细信息)。
<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
<orderItem>
<isbn>12333454443</isbn>
<quantity>4</quantity>
</orderItem>
<orderItem>
<isbn>545656777</isbn>
<quantity>50</quantity>
</orderItem>
..
..
</order>
一个 order 消息将包含多个 orderItem。我们可以预期队列中有数百条 order 条消息。
2) 最终结果::
a) 每个 orderitem 都应该写入文件。
b) 4 此类文件应写入唯一文件夹。
举个例子,假设我们有两条 order 消息——每条消息包含三个 orderitem.
所以我们需要创建 2 个文件夹:
在"folder 1"中应该有4个文件(每个文件1个orderitem)
在"folder 2"中应该有2个文件(每个文件1个orderitem)。这里为简单起见,我们假设不再有 order 消息,我们可以在 5 分钟后写入。
实施:
- 我能够从队列 (websphere MQ) 中读取消息并成功解组消息。
- 使用拆分器根据 orderitem 计数拆分消息。
- 使用聚合器将消息分组为 4 个大小。
我无法让聚合器按照我的理解工作。
- 我在 4 orderitem 时推送一个 order,消息正在正确聚合。
- 我用 5 个 orderitem 推送一个 order,前 4 个正在聚合,但最后一个被发送到丢弃通道。这是预期的,因为 MessageGroup 已释放,所以最后一条消息被丢弃。
- 我推送了两个 订单,每个包含 2 个 orderitem。最后2个orderitem发送到discard channel.
关联策略是硬编码的 (OrderAggregator.java),但上述情况应该有效。
需要有关如何实现此用例的指示,我可以将它们分组为 4 个并写入唯一的文件夹。 请注意orderitem都是独立的图书订单,它们之间没有任何关系。
配置如下。
spring-bean.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
<int:channel id="mqInbound"/>
<int:channel id="item"/>
<int:channel id="itemList"/>
<int:channel id="aggregatorDiscardChannel"/>
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="mqInbound"
destination="requestQueue"
message- converter="orderMessageConverter"/>
<int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/>
<int:chain id="aggregateList" input-channel="item" output-channel="itemList" >
<int:header-enricher>
<int:header name="sequenceSize" expression="4" overwrite="true"/>
</int:header-enricher>
<int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" />
</int:chain>
<int:service-activator input-channel="itemList" ref="displayAggregatedList" method="display"/>
<int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>
<bean id="orderAggregator" class="com.samples.Aggregator.OrderAggregator"/>
<bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
...
....
</beans>
OrderAggregator.java
public class OrderAggregator {
@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {
return orderItemTypeList;
}
@CorrelationStrategy
public String groupOrders( OrderItemType orderItemType) {
return "items";
}
}
DisplayAggregatedList.java
public class DisplayAggregatedList {
public void display(List <OrderItemType> orderItemTypeList) {
System.out.println("######## Display Aggregated ##############");
for(OrderItemType oit : orderItemTypeList) {
System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
}
}
public void displayDiscarded(Message<?> message) {
System.out.println("######## Display Discarded ##############" + message);
}
}
你需要的是expire-groups-upon-completion
:
When set to true (default false), completed groups are removed from the message store, allowing subsequent messages with the same correlation to form a new group. The default behavior is to send messages with the same correlation as a completed group to the discard-channel.
如果您无论如何都需要发布未完成的组(例如还剩 2 个订单),请考虑使用 group-timeout
:http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
请使用 expire-groups-upon-completion="true" 并考虑使用 MessageCountReleaseStrategy` 作为发布策略 – Artem Bilan