运行 来自同一 JVM 的 kafka 消费者和生产者时生产者速度慢
Slow producer when running kafka consumer and producer from same JVM
我正在使用 kafka 0.8 和 spring-integration-kafka 1.2。0.RELEASE
我有 2 个主题,分别名为主要主题和次要主题。我需要使用主要主题,经过一些处理后需要生成次要主题,以便稍后进行下一组处理。
虽然主要主题的消费工作正常,但生产到次要主题在几分钟后开始失败。问题始于我设置的 500 毫秒后发送请求到 kafka 超时。以线程池耗尽结束。
如果我尝试将事件生成到另一个 kafka 集群的次要主题,它可以正常工作。
我有 4 个消费者 运行 两个主题各有 200 个分区。
我是kafka的新手,知识不足请见谅。
请评论我应该提供的任何遗漏信息。
根据所提供的信息很难知道,但我怀疑这个问题是您可以从第一个主题消费然后计算结果的速度比生成第二个主题的速度更快。发生这种情况的原因可能有很多。例如,对次要主题的写入可能没有很好地跨分区分布。同样,由于各种原因,包括更快的机器、更多的机器、更好的网络等,生产到不同的集群可能会成功。
基本问题并不是 Kafka 特有的:如果您从一个来源使用数据并将该数据发送到第二个接收器,您通常不能假设第二个接收器总是比第一个来源更快。每当第二个接收器变慢时,即使是一点点,你最终也会遇到这样的问题。例如,假设您可以从主接收器读取 100 events/second,但辅助接收器只能消耗 99 events/second。这意味着每一秒你都会在内存中多一个事件等待发送到你的接收器。如果您不采取任何措施来降低从主要来源读取的速度,您将 运行 内存、线程或其他一些资源不足。
一般的解决办法是某种节流。例如,您可以使用以 500 个许可开头的 Semaphore
:这意味着您永远无法从尚未成功发送到接收器的主要来源读取超过 500 个项目。在从主要来源读取项目之前,您会减少 Semaphore
,这样如果您已经 "ahead of" 次要来源减少 500 个项目,您的 reader 将被阻止。每次您成功将一个项目发送到您的次要主题时,您就会释放一个许可,允许继续进行另一次阅读。
我会警告不要使用第二个 Kafka 集群或其他有效但不能真正解决核心问题的方法。例如,如果现在可以生产到不同的集群,那么当该集群由于节点丢失、大的重新平衡等而变慢时,它就不会了。这只是暂时隐藏了问题。
尝试了所有可能的配置后终于找到了问题。
错误地忘记删除之前为消费者集成添加的以下依赖项。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
它在生产时引起了一些冲突,正在等待状态下添加线程。如果有人可以指导它可以添加什么冲突将是一个很好的学习。
谢谢。
我正在使用 kafka 0.8 和 spring-integration-kafka 1.2。0.RELEASE
我有 2 个主题,分别名为主要主题和次要主题。我需要使用主要主题,经过一些处理后需要生成次要主题,以便稍后进行下一组处理。
虽然主要主题的消费工作正常,但生产到次要主题在几分钟后开始失败。问题始于我设置的 500 毫秒后发送请求到 kafka 超时。以线程池耗尽结束。
如果我尝试将事件生成到另一个 kafka 集群的次要主题,它可以正常工作。
我有 4 个消费者 运行 两个主题各有 200 个分区。
我是kafka的新手,知识不足请见谅。 请评论我应该提供的任何遗漏信息。
根据所提供的信息很难知道,但我怀疑这个问题是您可以从第一个主题消费然后计算结果的速度比生成第二个主题的速度更快。发生这种情况的原因可能有很多。例如,对次要主题的写入可能没有很好地跨分区分布。同样,由于各种原因,包括更快的机器、更多的机器、更好的网络等,生产到不同的集群可能会成功。
基本问题并不是 Kafka 特有的:如果您从一个来源使用数据并将该数据发送到第二个接收器,您通常不能假设第二个接收器总是比第一个来源更快。每当第二个接收器变慢时,即使是一点点,你最终也会遇到这样的问题。例如,假设您可以从主接收器读取 100 events/second,但辅助接收器只能消耗 99 events/second。这意味着每一秒你都会在内存中多一个事件等待发送到你的接收器。如果您不采取任何措施来降低从主要来源读取的速度,您将 运行 内存、线程或其他一些资源不足。
一般的解决办法是某种节流。例如,您可以使用以 500 个许可开头的 Semaphore
:这意味着您永远无法从尚未成功发送到接收器的主要来源读取超过 500 个项目。在从主要来源读取项目之前,您会减少 Semaphore
,这样如果您已经 "ahead of" 次要来源减少 500 个项目,您的 reader 将被阻止。每次您成功将一个项目发送到您的次要主题时,您就会释放一个许可,允许继续进行另一次阅读。
我会警告不要使用第二个 Kafka 集群或其他有效但不能真正解决核心问题的方法。例如,如果现在可以生产到不同的集群,那么当该集群由于节点丢失、大的重新平衡等而变慢时,它就不会了。这只是暂时隐藏了问题。
尝试了所有可能的配置后终于找到了问题。
错误地忘记删除之前为消费者集成添加的以下依赖项。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
它在生产时引起了一些冲突,正在等待状态下添加线程。如果有人可以指导它可以添加什么冲突将是一个很好的学习。
谢谢。