以固定的时间间隔从输入流中消费消息
Consume messages from Input stream at fixed intervals
我正在使用流监听器来使用来自 amazon kinesis 流的消息。
@StreamListener(Processor.Input)
public void receiveMessage(String message) {
//process
}
- 是否可以实现一个每 10 分钟从输入流中获取一次所有消息的侦听器
- 当有消费者群体时,它会如何表现。 Consumer 组中的所有容器是否都获得不同的消息列表。
我试过 @Poller
但它没有任何输入通道。
任何帮助都将非常有用。
AWS Kinesis 没有这样的轮询消费者实现。
但是你可以用这样的组合来模拟它:
listenerMode = batch
您将在 @StreamListener
中收到有效载荷作为 List<com.amazonaws.services.kinesis.model.Record>
默认的recordsLimit
是10000
。我认为这应该足以进行尽可能多的投票。
idleBetweenPolls = 600000 / 10 mins
你不会比这个选项更早进入下一批。
不,组中的每个消费者都会得到自己的列表。但这不符合您对轮询所有内容的原始要求。因此,同一组中的所有其他消费者都将闲置 - 没有什么可以为他们轮询!
我正在使用流监听器来使用来自 amazon kinesis 流的消息。
@StreamListener(Processor.Input)
public void receiveMessage(String message) {
//process
}
- 是否可以实现一个每 10 分钟从输入流中获取一次所有消息的侦听器
- 当有消费者群体时,它会如何表现。 Consumer 组中的所有容器是否都获得不同的消息列表。
我试过 @Poller
但它没有任何输入通道。
任何帮助都将非常有用。
AWS Kinesis 没有这样的轮询消费者实现。 但是你可以用这样的组合来模拟它:
listenerMode = batch
您将在 @StreamListener
中收到有效载荷作为 List<com.amazonaws.services.kinesis.model.Record>
默认的recordsLimit
是10000
。我认为这应该足以进行尽可能多的投票。
idleBetweenPolls = 600000 / 10 mins
你不会比这个选项更早进入下一批。
不,组中的每个消费者都会得到自己的列表。但这不符合您对轮询所有内容的原始要求。因此,同一组中的所有其他消费者都将闲置 - 没有什么可以为他们轮询!