Kafka - 动态/任意分区
Kafka - Dynamic / Arbitrary Partitioning
我正在为 Kafka 主题构建消费者服务。每条消息都包含一个 url,我的服务将向其发出 http 请求。每条消息 / url 完全独立于其他消息 / urls.
我担心的问题是如何处理long-运行ning请求。某些 http 请求可能需要 50 分钟以上才能返回响应。在那段时间里,我不想再耽误其他消息了。
并行化此操作的最佳方法是什么?
我知道Kafka的并行方式是创建分区。但是,从我读过的内容来看,当我真的想要无限或动态数量的分区时,您似乎需要预先定义分区数量(理想情况下,每条消息都会动态创建自己的分区)
举个例子,假设我创建了 1,000 个分区。如果为我的主题生成了 1,001 多条消息,将发出前 1,000 个请求,但之后的每条消息都将排队,直到该分区中的前一个请求完成。
我曾考虑过使 http 请求异步,但后来我似乎 运行 在确定要提交的偏移量时遇到了问题。
例如,在单个分区上,我可以让消费者读取第一条消息并发出异步请求。它提供了一个回调函数,将偏移量提交给 Kafka。在等待该请求时,我的消费者读取下一条消息并发出另一个异步请求。如果该请求在第一个请求之前完成,它将提交该偏移量。现在,如果第一个请求由于某种原因失败或者我的消费者进程终止了会发生什么?如果我已经提交了更高的偏移量,这听起来意味着我的第一条消息将永远不会被重新处理,这不是我想要的。
当涉及到使用 Kafka 的长运行宁异步消息处理时,我显然遗漏了一些东西。有没有人遇到过类似的问题或想过如何最好地解决这个问题?预先感谢您花时间阅读本文。
您应该将 Apache Storm 用于消费者的处理部分,并将消息存储和检索留给 Kafka。您所描述的是大数据中一个非常常见的用例(尽管 50 分钟以上的时间有点极端)。简而言之,您将为您的主题分配少量分区,并让 Storm 流处理扩展实际发出 http 请求的组件数量("bolts",用 Storm 的话说)。单个 spout(从外部源读取数据的那种风暴组件)可以从 Kafka 主题读取消息并将它们流式传输到处理螺栓。
我已经在 github 上发布了 an open source example 如何编写 Storm/Kafka 应用程序。
这个答案的一些后续想法:
1) 虽然我认为 Storm 是正确的平台方法,但您没有理由不能通过编写执行 http 调用的 Runnable 然后编写更多代码来创建单个 Kafka 消费者来推出自己的方法读取消息并使用可运行的多线程实例处理它们。所需的管理代码有点有趣,但可能比从头开始学习 Storm 更容易编写。因此,您可以通过在更多线程上添加更多 Runnable 实例来进行扩展。
2) 无论你使用Storm还是你自己的多线程解决方案,你仍然会遇到如何在Kafka中管理偏移量的问题。简短的回答是您必须自己进行复杂的偏移量管理。您不仅必须保留从 Kafka 读取的最后一条消息的偏移量,还必须保留和管理当前正在处理的正在处理的消息列表。这样,如果您的应用程序出现故障,您就知道正在处理哪些消息,并且可以在重新启动时检索并重新处理它们。基本的 Kafka 偏移持久性不支持这种更复杂的需求,但它只是为了方便更简单的用例而存在。您可以将偏移量信息保存在任何您喜欢的地方(Zookeeper、文件系统或任何数据库)。
我正在为 Kafka 主题构建消费者服务。每条消息都包含一个 url,我的服务将向其发出 http 请求。每条消息 / url 完全独立于其他消息 / urls.
我担心的问题是如何处理long-运行ning请求。某些 http 请求可能需要 50 分钟以上才能返回响应。在那段时间里,我不想再耽误其他消息了。
并行化此操作的最佳方法是什么?
我知道Kafka的并行方式是创建分区。但是,从我读过的内容来看,当我真的想要无限或动态数量的分区时,您似乎需要预先定义分区数量(理想情况下,每条消息都会动态创建自己的分区)
举个例子,假设我创建了 1,000 个分区。如果为我的主题生成了 1,001 多条消息,将发出前 1,000 个请求,但之后的每条消息都将排队,直到该分区中的前一个请求完成。
我曾考虑过使 http 请求异步,但后来我似乎 运行 在确定要提交的偏移量时遇到了问题。
例如,在单个分区上,我可以让消费者读取第一条消息并发出异步请求。它提供了一个回调函数,将偏移量提交给 Kafka。在等待该请求时,我的消费者读取下一条消息并发出另一个异步请求。如果该请求在第一个请求之前完成,它将提交该偏移量。现在,如果第一个请求由于某种原因失败或者我的消费者进程终止了会发生什么?如果我已经提交了更高的偏移量,这听起来意味着我的第一条消息将永远不会被重新处理,这不是我想要的。
当涉及到使用 Kafka 的长运行宁异步消息处理时,我显然遗漏了一些东西。有没有人遇到过类似的问题或想过如何最好地解决这个问题?预先感谢您花时间阅读本文。
您应该将 Apache Storm 用于消费者的处理部分,并将消息存储和检索留给 Kafka。您所描述的是大数据中一个非常常见的用例(尽管 50 分钟以上的时间有点极端)。简而言之,您将为您的主题分配少量分区,并让 Storm 流处理扩展实际发出 http 请求的组件数量("bolts",用 Storm 的话说)。单个 spout(从外部源读取数据的那种风暴组件)可以从 Kafka 主题读取消息并将它们流式传输到处理螺栓。
我已经在 github 上发布了 an open source example 如何编写 Storm/Kafka 应用程序。
这个答案的一些后续想法:
1) 虽然我认为 Storm 是正确的平台方法,但您没有理由不能通过编写执行 http 调用的 Runnable 然后编写更多代码来创建单个 Kafka 消费者来推出自己的方法读取消息并使用可运行的多线程实例处理它们。所需的管理代码有点有趣,但可能比从头开始学习 Storm 更容易编写。因此,您可以通过在更多线程上添加更多 Runnable 实例来进行扩展。
2) 无论你使用Storm还是你自己的多线程解决方案,你仍然会遇到如何在Kafka中管理偏移量的问题。简短的回答是您必须自己进行复杂的偏移量管理。您不仅必须保留从 Kafka 读取的最后一条消息的偏移量,还必须保留和管理当前正在处理的正在处理的消息列表。这样,如果您的应用程序出现故障,您就知道正在处理哪些消息,并且可以在重新启动时检索并重新处理它们。基本的 Kafka 偏移持久性不支持这种更复杂的需求,但它只是为了方便更简单的用例而存在。您可以将偏移量信息保存在任何您喜欢的地方(Zookeeper、文件系统或任何数据库)。