SpringXD 和 Spring 集成:每 X 分钟从 kafka 主题读取一次,然后发送到另一个主题
SpringXD and Spring Integration: Read from kafka topic every X minutes, then send to another Topic
我正在尝试实施一个解决方案来创建由 kafka 源、桥接模块和 kafka 接收器组成的 SpringXD 流。
所以我有类似的东西:
<channel id="pollable">
<queue />
</channel>
<bridge input-channel="pollable" output-channel="executorChannel">
<poller max-messages-per-poll="5" fixed-rate="5000" />
</bridge>
我的问题是我想以某种方式避开轮询器。基本上是因为当这些消息在队列中时,我想避免将消息保留在内存中。我宁愿每隔 X 分钟从 kafka 读取一次,然后从队列中取出 Y 条消息并将它们发送到下一个主题。
看来我无法摆脱队列,但我的问题是 be:Is 还有其他选择吗?我不喜欢将东西保存在内存中,但我也不想使用此选项:http://docs.spring.io/spring-integration/reference/html/system-management-chapter.html#message-store
将数据保存在内存中不是一个好主意。
您可以根据需要stop()
和start()
通道适配器(KafkaMessageDrivenChannelAdapter
);它会在重新启动时从停止的地方继续。
然而,kafka源码使用的是非常老版本的spring-integration-kafka (1.3.x).
如果您创建自定义源以使用 spring-integration-kafka 2.1.0(使用 kafka 0.10.1.x 客户端),您可以设置 kafka 属性 max.poll.records
限制获取的记录数。
我正在尝试实施一个解决方案来创建由 kafka 源、桥接模块和 kafka 接收器组成的 SpringXD 流。
所以我有类似的东西:
<channel id="pollable">
<queue />
</channel>
<bridge input-channel="pollable" output-channel="executorChannel">
<poller max-messages-per-poll="5" fixed-rate="5000" />
</bridge>
我的问题是我想以某种方式避开轮询器。基本上是因为当这些消息在队列中时,我想避免将消息保留在内存中。我宁愿每隔 X 分钟从 kafka 读取一次,然后从队列中取出 Y 条消息并将它们发送到下一个主题。
看来我无法摆脱队列,但我的问题是 be:Is 还有其他选择吗?我不喜欢将东西保存在内存中,但我也不想使用此选项:http://docs.spring.io/spring-integration/reference/html/system-management-chapter.html#message-store
将数据保存在内存中不是一个好主意。
您可以根据需要stop()
和start()
通道适配器(KafkaMessageDrivenChannelAdapter
);它会在重新启动时从停止的地方继续。
然而,kafka源码使用的是非常老版本的spring-integration-kafka (1.3.x).
如果您创建自定义源以使用 spring-integration-kafka 2.1.0(使用 kafka 0.10.1.x 客户端),您可以设置 kafka 属性 max.poll.records
限制获取的记录数。