使用reactive-kafka有条件地处理消息
Using reactive-kafka to conditionally process messages
我一直在尝试使用reactive-kafka,但我遇到了条件处理问题,我没有找到满意的答案。
基本上,我正在尝试使用一个包含大量消息(每天大约 100 亿条消息)的 kafka 主题,并且只处理其中一些消息(每天几千条)基于一些 属性 的消息,然后将我的消息的处理版本推送到另一个主题,我正在努力正确地做到这一点。
我的第一次尝试是这样的:
// This is pseudo code.
Source(ProducerSettings(...))
.filter(isProcessable(_))
.map(process(_))
.via(Producer.flow(producerSettings))
.map(_.commitScalaDsl())
.runWith(Sink.ignore)
这种方法的问题是,我只在阅读我能够处理的消息时才提交,这显然不是很酷,因为如果我必须停止并重新启动我的程序,那么我必须重新阅读一堆无用的消息,而且由于它们太多了,我不能那样做。
然后我尝试通过围绕以下几行做一些事情来使用 GraphDSL:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> commit
~> broadcast ~> isNotProcessable ~> merge
这个解决方案显然也不好,因为我无法处理的消息会经过图的第二个分支并在可处理的消息真正被推送到它们的目的地之前得到提交,这比第一个更糟糕消息,因为它甚至不能保证至少一次传递。
有人知道我该如何解决这个问题吗?
我之前解决类似问题的一种方法是利用序列号来保证排序。
例如,您可以构建一个类似于您描述的流程保存提交:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> out
~> broadcast ~> isNotProcessable ~> merge
然后将其包装成这样的订单保留流程(取自我们在我公司开发的库):OrderPreservingFlow。然后可以将生成的流发送到提交者接收器。
如果您的处理阶段保证顺序,您甚至可以通过将逻辑直接嵌入图表中来提高效率并避免任何缓冲:
in ~> injectSeqNr ~> broadcast ~> isProcessable ~> process ~> producer ~> mergeNextSeqNr ~> commit
~> broadcast ~> isNotProcessable ~> mergeNextSeqNr
这里你的 mergeNextSeqNr 只是一个修改后的合并阶段,如果输入在端口 1 上可用,如果它的序列号是预期的,你会立即发出它,否则你只需等待数据在另一个端口上可用。
最终结果应该与使用上面的流环绕完全相同,但如果嵌入它,您可能会更容易地适应您的需要。
我一直在尝试使用reactive-kafka,但我遇到了条件处理问题,我没有找到满意的答案。
基本上,我正在尝试使用一个包含大量消息(每天大约 100 亿条消息)的 kafka 主题,并且只处理其中一些消息(每天几千条)基于一些 属性 的消息,然后将我的消息的处理版本推送到另一个主题,我正在努力正确地做到这一点。
我的第一次尝试是这样的:
// This is pseudo code.
Source(ProducerSettings(...))
.filter(isProcessable(_))
.map(process(_))
.via(Producer.flow(producerSettings))
.map(_.commitScalaDsl())
.runWith(Sink.ignore)
这种方法的问题是,我只在阅读我能够处理的消息时才提交,这显然不是很酷,因为如果我必须停止并重新启动我的程序,那么我必须重新阅读一堆无用的消息,而且由于它们太多了,我不能那样做。
然后我尝试通过围绕以下几行做一些事情来使用 GraphDSL:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> commit
~> broadcast ~> isNotProcessable ~> merge
这个解决方案显然也不好,因为我无法处理的消息会经过图的第二个分支并在可处理的消息真正被推送到它们的目的地之前得到提交,这比第一个更糟糕消息,因为它甚至不能保证至少一次传递。
有人知道我该如何解决这个问题吗?
我之前解决类似问题的一种方法是利用序列号来保证排序。
例如,您可以构建一个类似于您描述的流程保存提交:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> out
~> broadcast ~> isNotProcessable ~> merge
然后将其包装成这样的订单保留流程(取自我们在我公司开发的库):OrderPreservingFlow。然后可以将生成的流发送到提交者接收器。
如果您的处理阶段保证顺序,您甚至可以通过将逻辑直接嵌入图表中来提高效率并避免任何缓冲:
in ~> injectSeqNr ~> broadcast ~> isProcessable ~> process ~> producer ~> mergeNextSeqNr ~> commit
~> broadcast ~> isNotProcessable ~> mergeNextSeqNr
这里你的 mergeNextSeqNr 只是一个修改后的合并阶段,如果输入在端口 1 上可用,如果它的序列号是预期的,你会立即发出它,否则你只需等待数据在另一个端口上可用。
最终结果应该与使用上面的流环绕完全相同,但如果嵌入它,您可能会更容易地适应您的需要。