仅当消息数量达到计数或消息在频道中一段时间​​后才从频道中消费

Consume from channel only if the number of messages reaches a count or the message is in the channel since a while

我有一个自定义接收器模块,我想仅在消息数量达到一定数量或它们在频道中存在一段时间后才使用来自 input 的消息。简而言之,我想批量推送。

我已经尝试在使用后汇总消息数量并将它们存储在由 SimpleMessageStore 支持的聚合频道中,并让 MessageGroupStoreReaper 检查频道中的消息。

我对这种方法不满意,因为我正在使用消息并将它们存储在内存存储中,我也知道 JDBC 存储,但我不想遵循这个方法,因为 spring XD 中的消息通道由 redis/mq 支持 我想根据我的条件从 input 通道消费。

我目前的bean配置如下图:

<int:aggregator id="messageAggregator" ref="messageAggregatorBean"
    method="aggregate" input-channel="input" output-channel="aggregatorOutputChannel"
    release-strategy="messageReleaseStrategyBean" release-strategy-method="canRelease"
    send-partial-result-on-expiry="true" message-store="resultMessageStore">
</int:aggregator>

<int:service-activator id="contributionIndexerService"
    ref="contributionIndexerBean" method="bulkIndex" input-channel="aggregatorOutChannel" />

<bean id="resultMessageStore"
    class="org.springframework.integration.store.SimpleMessageStore" />

<bean id="resultMessageStoreReaper"
    class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="resultMessageStore" />
    <property name="timeout" value="60000" />
</bean>

<task:scheduled-tasks>
    <task:scheduled ref="resultMessageStoreReaper" method="run"
        fixed-rate="10000" />
</task:scheduled-tasks>

有什么想法或意见吗?

提前致谢。

我不确定您是否能够确定 Broker 队列中的消息数(Redis/RabbitMQ 或什至只是普通的 JMS),更多的是它们在那里的数量。

你绝对应该消费他们来做这样的逻辑。

是的,我认为 Aggregator 可以帮助您。但是对:那一定是 Persistent Message Store.

案例

if they are in the channel since some time

Aggregator建议group-timeout这样的选项释放那些还没有达到releaseStrategy条件的群体,但你会无论如何都喜欢在一段时间内发出它们:http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to