Kafka + Spark 可扩展性
Kafka + Spark scalability
我们有非常简单的 Spark Streaming 作业(在 Java 中实现),即:
- 通过 DirectStream 从 Kafka 读取 JSON(关闭 Kafka 消息的确认)
- 将 JSON 解析为 POJO(使用 GSON - 我们的消息只有 ~300 字节)
- 将 POJO 映射到键值元组(值 = 对象)
- reduceByKey(自定义 reduce 函数 - 始终比较 1 个字段 - 质量 - 来自对象并留下质量更高的对象实例)
- 将结果存储在状态中(通过 mapWithState 存储每个键质量最高的对象)
- 将结果存储到 HDFS
JSON 是用一组 1000 个 ID(键)生成的,所有事件都随机分布到 Kafka 主题分区。这也意味着,生成的对象集最多为 1000 个,因为作业仅存储每个 ID 质量最高的对象。
我们在 AWS EMR(m4.xlarge = 4 核,16 GB 内存)上进行了 运行 性能测试,参数如下:
- 执行者数量 = 节点数(即每个节点 1 个执行者)
- Kafka 分区数 = 节点数(即在我们的例子中也是执行者)
- 批量大小 = 10(秒)
- 滑动window = 20(s)
- window尺寸=600(s)
- 块大小 = 2000(毫秒)
- 默认并行度 - 尝试了不同的设置,但是当默认并行度 = nodes/executors
时获得最佳结果
Kafka 集群仅包含 1 个代理,在峰值负载期间最多使用 ~30-40%(我们将数据预填充到主题,然后独立执行测试)。我们曾尝试增加num.io.threads和num.network.threads,但没有显着改善。
性能测试(大约 10 分钟的连续负载)的结果是(YARN 主节点和驱动程序节点在以下节点数之上):
- 2 个节点 - 最多能够处理。 150 000 events/s 没有任何处理延迟
- 5 个节点 - 280 000 events/s => 25% 惩罚 如果与预期相比 "almost linear scalability"
- 10 个节点 - 380 000 events/s => 50% 惩罚 如果与预期相比 "almost linear scalability"
2 个节点的 CPU 利用率为 ~
我们还尝试了其他设置,包括:
- 测试 low/high 分区数
- 测试 low/high/defaultParallelism 的默认值
- 使用更多执行者进行测试(即将资源分配给例如 30 个执行者而不是 10 个)
但上面的设置给了我们最好的结果。
所以 - 问题 - Kafka + Spark(几乎)是线性可扩展的吗?如果它的可扩展性比我们的测试显示的更好 - 如何改进它。我们的目标是支持 hundreds/thousands 个 Spark 执行器(即可扩展性对我们至关重要)。
我们已通过以下方式解决此问题:
- 增加Kafka集群容量
- 更多 CPU 功能 - 增加了 Kafka 的节点数量(每 2 个 spark 执行器节点 1 个 Kafka 节点似乎没问题)
- 更多经纪人 - 基本上每个执行人 1 个经纪人给了我们最好的结果
- 设置适当的默认并行度(集群中的核心数 * 2)
- 确保所有节点都有大约。同样的工作量
- batch size/blockSize 应该是〜等于或执行者数量的倍数
最后,我们已经能够实现由具有 10 个执行程序节点的 spark 集群处理 1 100 000 events/s。所做的调整还提高了节点较少的配置的性能 -> 当从 2 个 spark 执行器节点扩展到 10 个(AWS 上的m4.xlarge)时,我们已经实现了几乎线性的可扩展性。
一开始,Kafka节点上的CPU没有接近极限,但是它无法响应Spark执行器的需求。
感谢所有建议,特别是@ArturBiesiadowski,他建议 Kafka 集群大小不正确。
我们有非常简单的 Spark Streaming 作业(在 Java 中实现),即:
- 通过 DirectStream 从 Kafka 读取 JSON(关闭 Kafka 消息的确认)
- 将 JSON 解析为 POJO(使用 GSON - 我们的消息只有 ~300 字节)
- 将 POJO 映射到键值元组(值 = 对象)
- reduceByKey(自定义 reduce 函数 - 始终比较 1 个字段 - 质量 - 来自对象并留下质量更高的对象实例)
- 将结果存储在状态中(通过 mapWithState 存储每个键质量最高的对象)
- 将结果存储到 HDFS
JSON 是用一组 1000 个 ID(键)生成的,所有事件都随机分布到 Kafka 主题分区。这也意味着,生成的对象集最多为 1000 个,因为作业仅存储每个 ID 质量最高的对象。
我们在 AWS EMR(m4.xlarge = 4 核,16 GB 内存)上进行了 运行 性能测试,参数如下:
- 执行者数量 = 节点数(即每个节点 1 个执行者)
- Kafka 分区数 = 节点数(即在我们的例子中也是执行者)
- 批量大小 = 10(秒)
- 滑动window = 20(s)
- window尺寸=600(s)
- 块大小 = 2000(毫秒)
- 默认并行度 - 尝试了不同的设置,但是当默认并行度 = nodes/executors 时获得最佳结果
Kafka 集群仅包含 1 个代理,在峰值负载期间最多使用 ~30-40%(我们将数据预填充到主题,然后独立执行测试)。我们曾尝试增加num.io.threads和num.network.threads,但没有显着改善。
性能测试(大约 10 分钟的连续负载)的结果是(YARN 主节点和驱动程序节点在以下节点数之上):
- 2 个节点 - 最多能够处理。 150 000 events/s 没有任何处理延迟
- 5 个节点 - 280 000 events/s => 25% 惩罚 如果与预期相比 "almost linear scalability"
- 10 个节点 - 380 000 events/s => 50% 惩罚 如果与预期相比 "almost linear scalability"
2 个节点的 CPU 利用率为 ~
我们还尝试了其他设置,包括: - 测试 low/high 分区数 - 测试 low/high/defaultParallelism 的默认值 - 使用更多执行者进行测试(即将资源分配给例如 30 个执行者而不是 10 个) 但上面的设置给了我们最好的结果。
所以 - 问题 - Kafka + Spark(几乎)是线性可扩展的吗?如果它的可扩展性比我们的测试显示的更好 - 如何改进它。我们的目标是支持 hundreds/thousands 个 Spark 执行器(即可扩展性对我们至关重要)。
我们已通过以下方式解决此问题:
- 增加Kafka集群容量
- 更多 CPU 功能 - 增加了 Kafka 的节点数量(每 2 个 spark 执行器节点 1 个 Kafka 节点似乎没问题)
- 更多经纪人 - 基本上每个执行人 1 个经纪人给了我们最好的结果
- 设置适当的默认并行度(集群中的核心数 * 2)
- 确保所有节点都有大约。同样的工作量
- batch size/blockSize 应该是〜等于或执行者数量的倍数
最后,我们已经能够实现由具有 10 个执行程序节点的 spark 集群处理 1 100 000 events/s。所做的调整还提高了节点较少的配置的性能 -> 当从 2 个 spark 执行器节点扩展到 10 个(AWS 上的m4.xlarge)时,我们已经实现了几乎线性的可扩展性。
一开始,Kafka节点上的CPU没有接近极限,但是它无法响应Spark执行器的需求。
感谢所有建议,特别是@ArturBiesiadowski,他建议 Kafka 集群大小不正确。