同一 Kinesis 流的多个不同消费者
Multiple different consumers of same Kinesis stream
我有一个 Kinesis 生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,pub/sub 与给定 topic/stream 的单个发布者。我还想利用检查点来确保每个消费者都处理写入流的每条消息。
最初,我为所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动了多个消费者,我就开始收到以下错误:
com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 used in GetShardIterator on shard shardId-000000000000 in stream PackageCreated under account ************ is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ..)
这似乎是因为消费者在使用相同的应用程序名称时与他们的检查点发生冲突。
通过阅读文档,似乎 pub/sub 使用检查点的唯一方法是让每个消费者应用程序都有一个流,这需要每个生产者了解所有可能的消费者。这比我想要的更紧密;这真的只是一个队列。
Kafka 似乎支持我想要的:任意消费给定的 topic/partition,因为消费者完全控制自己的检查点。如果我想要 pub/sub 带检查点,我是转向 Kafka 还是其他选择的唯一选择?
我的 RecordProcessor 代码,在每个消费者中都是相同的:
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
log.trace("Received record(s) from kinesis")
for {
record <- processRecordsInput.getRecords
json <- jawn.parseByteBuffer(record.getData).toOption
msg <- decode[T](json.toString).toOption
} yield subscriber ! msg
processRecordsInput.getCheckpointer.checkpoint()
}
代码解析消息并将其发送给订阅者。现在,我只是将所有消息标记为已成功接收。我可以在 AWS Kinesis 仪表板上看到正在发送的消息,但没有读取发生,大概是因为每个应用程序都有自己的 AppName 并且看不到任何其他消息。
支持您想要的模式,即一个发布者到一个 Kinesis 流的多个消费者。您不需要每个消费者单独的流。
你是怎么做到的?您需要为每个消费者提供不同的应用程序名称。这样,一个消费者的检查点信息就不会与另一个消费者的信息发生冲突。
查看对此的第一个回复:https://forums.aws.amazon.com/message.jspa?messageID=554375
我有一个 Kinesis 生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,pub/sub 与给定 topic/stream 的单个发布者。我还想利用检查点来确保每个消费者都处理写入流的每条消息。
最初,我为所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动了多个消费者,我就开始收到以下错误:
com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 used in GetShardIterator on shard shardId-000000000000 in stream PackageCreated under account ************ is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ..)
这似乎是因为消费者在使用相同的应用程序名称时与他们的检查点发生冲突。
通过阅读文档,似乎 pub/sub 使用检查点的唯一方法是让每个消费者应用程序都有一个流,这需要每个生产者了解所有可能的消费者。这比我想要的更紧密;这真的只是一个队列。
Kafka 似乎支持我想要的:任意消费给定的 topic/partition,因为消费者完全控制自己的检查点。如果我想要 pub/sub 带检查点,我是转向 Kafka 还是其他选择的唯一选择?
我的 RecordProcessor 代码,在每个消费者中都是相同的:
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
log.trace("Received record(s) from kinesis")
for {
record <- processRecordsInput.getRecords
json <- jawn.parseByteBuffer(record.getData).toOption
msg <- decode[T](json.toString).toOption
} yield subscriber ! msg
processRecordsInput.getCheckpointer.checkpoint()
}
代码解析消息并将其发送给订阅者。现在,我只是将所有消息标记为已成功接收。我可以在 AWS Kinesis 仪表板上看到正在发送的消息,但没有读取发生,大概是因为每个应用程序都有自己的 AppName 并且看不到任何其他消息。
支持您想要的模式,即一个发布者到一个 Kinesis 流的多个消费者。您不需要每个消费者单独的流。
你是怎么做到的?您需要为每个消费者提供不同的应用程序名称。这样,一个消费者的检查点信息就不会与另一个消费者的信息发生冲突。
查看对此的第一个回复:https://forums.aws.amazon.com/message.jspa?messageID=554375