Spark Streaming App 在同时写入和读取 to/from Cassandra 时卡住
Spark Streaming App stucks while writing and reading to/from Cassandra simultaneously
我正在做一些由以下数据流组成的基准测试:
Kafka --> Spark Streaming --> Cassandra --> Prestodb
基础设施:我的 spark 流应用程序 运行s 在 4 个执行器上(2 个内核,每个 4g 内存)。安装了 Cassandra 的数据节点上的每个执行程序 运行s。 4 PrestoDB worker 也位于数据节点中。我的集群有 5 个节点,每个节点都有 Intel Core i5、32GB DDR3 RAM、500GB SSD 和 1gigabit 网络。
Spark streaming application: 我的 Spark streaming 批处理间隔是 10s,我的 kafka 生产者每 3 秒产生 5000 个事件。我的流应用程序写入 2 个 Cassandra 表。
一切正常的上下文:一切正常运行,流应用程序能够处理事件并将它们存储在 Cassandra 中。批次间隔足够,摄取率、调度和处理延迟在很长一段时间内几乎保持不变。
事情变得混乱和令人困惑的上下文:在我的基准测试中,我每小时对 Cassandra 表进行 运行 6 次查询。在我 运行 处理这些查询的时间里,Spark 流应用程序不再能够维持写入吞吐量并在写入 Cassandra 时挂起。
我目前所做的: 我在其他网帖(包括Whosebug)上搜索过这种现象,但没有找到类似的现象。我见过的最好的方法是增加 Cassandra 可用的内存量。还看到了与连接器的获取大小相关的其他方面,但我不知道这是否是一个问题,因为它只在同时读取和写入时发生。
问题:Cassandra 不应该在读取时锁定写入,对吗?你们认为我需要解决的问题的根源是什么?我应该考虑哪些配置?
我附上了一份印刷品 a print 说明作业卡在写入其中一个 Cassandra 表的阶段,当我 运行 使用 6 个查询进行基准测试时,如前所述。如果您需要更多信息来追踪问题,请随意询问。我很感激!
非常感谢您的支持,
希望我以适当的方式提出问题,
此致,
卡洛斯
我想知道这是否与cassandra有关。
Spark scheduling 策略配置的目的是什么?默认情况下,它是 FIFO。这意味着,查询作业可能会消耗所有内核,直到完成为止。并且,将使流媒体工作挨饿。如果是 FIFO,则更改为 FAIR 并重试。
这个问题看起来是在 Cassandra-Presto 方面而不是在 Spark 方面,因为 reasons/assumptions
- 由于 spark 执行器由 RM(yarn/mesos 等)处理,您的查询不能直接影响它。在关闭查询期间,摄取如前所述顺利运行。
- 仅当您直接与其他组件共享资源时才会发生 Spark 端资源匮乏。通常,Cassandra、Presto workers/threads 不使用 RM 分配,因此它们从节点角度而不是 RM 角度共享资源。
我怀疑停顿的原因可能是,
- 在查询期间,Cassandra 正在读取大量数据,因此 JVM 内存利用率增加,并且发生了大量 GC。 GC 暂停可能是 pauses/stalls.
背后的原因
- 与 Cassandra 的连接数 (read/write) 已完全被查询使用,因此 Spark 作业无法插入数据并在队列中等待获取连接。
- 节点上的整体资源利用率增加,可能其中一个组件已达到其限制(CPU、内存、磁盘等)。 IMO CPU,在这种情况下,磁盘值得检查。
通过监控 heap util 和 GC 日志、使用 JMX[=39= 打开连接来验证这些原因] 对于 Cassandra,然后根据可用资源提高这些值以解决问题并尝试调整 Presto 查询,以便将影响降到最低。
Presto调优可以在确认Cassandra问题后作为后续部分进行。
提供更多 Presto 调整
https://prestodb.io/docs/current/admin/tuning.html
或者如果使用 teradata 解决方案,
https://teradata.github.io/presto/docs/current/admin/tuning.html
Just to finish this question and following the advise that was given to me by the stack overflow members that were kind enough to reply:
1) I changed the Java garbage collector in Cassandra's jvm.options to G1, which does not require as much tuning as as the default CMS. http://docs.datastax.com/en/cassandra/3.0/cassandra/operations/opsTuneJVM.html#opsTuneJVM__setting-g1-gc
I changed it because GC pauses were frequently indicated as a problem in similar scenarios. To be honest, I'm not too familiar with GC monitoring, but the Datastax Cassandra documentation tells to use G1. I changed it, and I noticed a bit of a performance and stability boost in my workloads and infrastructure.
2) I realised that I was being to optimist regarding my cluster's performance, and it was nearly impossible to process and write to Cassandra 20 000 events each 10 seconds, while 运行 heavy prestoDB queries on Cassandra. PrestoDB and Cassandra were consuming almost all CPU in my nodes. I only have 4 cores in each node. Cassandra CPU usage was almost 280%, Presto almost 90%, and therefore Spark streaming executors where starving. Besides, there was no more room for Cassandra to accommodate this write rate, and spark streaming jobs begun to hang, accumulating several batches over a long period of time. Therefore, I set the batch interval higher. I know you lose some data timeliness, but if the infrastructure cannot handle, I don't have the resources to scale :(
Another possibility is tu fine tune CPU limits by applications using linux cgroups, yarn cpu isolation and queues, for example, and see if it helps. For my cluster, as previously said, I think the problem is really trying to "move a montain using a small remote car" :) Every node was in charge of spark streaming jobs + cassandra + presto, with only 4 cores available.
3) Finally I also tuned the spark Cassandra connector, and this worked for my workload, but I think it will depend on your data and other variables.我变了:
* number of concurrent writes from 5 to 2;
* grouping.key from "partition" to "replica_set", because my partition key has high cardinality (they are almost unique within RDDs), and therefore it was useless to group batches according to the partition key. But this of course depends on your partition key.
* batch.size.rows to 50. But this may depend on the amount of data you have for each streaming micro-batch.
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md
I hope this post can help other people facing difficulties in streaming Big Data projects, since things can get very very difficult sometimes :) If some of my decisions are also wrong or misleading, please let me know. Every help is appreciated.
Thank you all,
Best regards,
Carlos Costa
我正在做一些由以下数据流组成的基准测试:
Kafka --> Spark Streaming --> Cassandra --> Prestodb
基础设施:我的 spark 流应用程序 运行s 在 4 个执行器上(2 个内核,每个 4g 内存)。安装了 Cassandra 的数据节点上的每个执行程序 运行s。 4 PrestoDB worker 也位于数据节点中。我的集群有 5 个节点,每个节点都有 Intel Core i5、32GB DDR3 RAM、500GB SSD 和 1gigabit 网络。
Spark streaming application: 我的 Spark streaming 批处理间隔是 10s,我的 kafka 生产者每 3 秒产生 5000 个事件。我的流应用程序写入 2 个 Cassandra 表。
一切正常的上下文:一切正常运行,流应用程序能够处理事件并将它们存储在 Cassandra 中。批次间隔足够,摄取率、调度和处理延迟在很长一段时间内几乎保持不变。
事情变得混乱和令人困惑的上下文:在我的基准测试中,我每小时对 Cassandra 表进行 运行 6 次查询。在我 运行 处理这些查询的时间里,Spark 流应用程序不再能够维持写入吞吐量并在写入 Cassandra 时挂起。
我目前所做的: 我在其他网帖(包括Whosebug)上搜索过这种现象,但没有找到类似的现象。我见过的最好的方法是增加 Cassandra 可用的内存量。还看到了与连接器的获取大小相关的其他方面,但我不知道这是否是一个问题,因为它只在同时读取和写入时发生。
问题:Cassandra 不应该在读取时锁定写入,对吗?你们认为我需要解决的问题的根源是什么?我应该考虑哪些配置?
我附上了一份印刷品 a print 说明作业卡在写入其中一个 Cassandra 表的阶段,当我 运行 使用 6 个查询进行基准测试时,如前所述。如果您需要更多信息来追踪问题,请随意询问。我很感激!
非常感谢您的支持,
希望我以适当的方式提出问题,
此致,
卡洛斯
我想知道这是否与cassandra有关。
Spark scheduling 策略配置的目的是什么?默认情况下,它是 FIFO。这意味着,查询作业可能会消耗所有内核,直到完成为止。并且,将使流媒体工作挨饿。如果是 FIFO,则更改为 FAIR 并重试。
这个问题看起来是在 Cassandra-Presto 方面而不是在 Spark 方面,因为 reasons/assumptions
- 由于 spark 执行器由 RM(yarn/mesos 等)处理,您的查询不能直接影响它。在关闭查询期间,摄取如前所述顺利运行。
- 仅当您直接与其他组件共享资源时才会发生 Spark 端资源匮乏。通常,Cassandra、Presto workers/threads 不使用 RM 分配,因此它们从节点角度而不是 RM 角度共享资源。
我怀疑停顿的原因可能是,
- 在查询期间,Cassandra 正在读取大量数据,因此 JVM 内存利用率增加,并且发生了大量 GC。 GC 暂停可能是 pauses/stalls. 背后的原因
- 与 Cassandra 的连接数 (read/write) 已完全被查询使用,因此 Spark 作业无法插入数据并在队列中等待获取连接。
- 节点上的整体资源利用率增加,可能其中一个组件已达到其限制(CPU、内存、磁盘等)。 IMO CPU,在这种情况下,磁盘值得检查。
通过监控 heap util 和 GC 日志、使用 JMX[=39= 打开连接来验证这些原因] 对于 Cassandra,然后根据可用资源提高这些值以解决问题并尝试调整 Presto 查询,以便将影响降到最低。
Presto调优可以在确认Cassandra问题后作为后续部分进行。
提供更多 Presto 调整https://prestodb.io/docs/current/admin/tuning.html 或者如果使用 teradata 解决方案, https://teradata.github.io/presto/docs/current/admin/tuning.html
Just to finish this question and following the advise that was given to me by the stack overflow members that were kind enough to reply:
1) I changed the Java garbage collector in Cassandra's jvm.options to G1, which does not require as much tuning as as the default CMS. http://docs.datastax.com/en/cassandra/3.0/cassandra/operations/opsTuneJVM.html#opsTuneJVM__setting-g1-gc I changed it because GC pauses were frequently indicated as a problem in similar scenarios. To be honest, I'm not too familiar with GC monitoring, but the Datastax Cassandra documentation tells to use G1. I changed it, and I noticed a bit of a performance and stability boost in my workloads and infrastructure.
2) I realised that I was being to optimist regarding my cluster's performance, and it was nearly impossible to process and write to Cassandra 20 000 events each 10 seconds, while 运行 heavy prestoDB queries on Cassandra. PrestoDB and Cassandra were consuming almost all CPU in my nodes. I only have 4 cores in each node. Cassandra CPU usage was almost 280%, Presto almost 90%, and therefore Spark streaming executors where starving. Besides, there was no more room for Cassandra to accommodate this write rate, and spark streaming jobs begun to hang, accumulating several batches over a long period of time. Therefore, I set the batch interval higher. I know you lose some data timeliness, but if the infrastructure cannot handle, I don't have the resources to scale :(
Another possibility is tu fine tune CPU limits by applications using linux cgroups, yarn cpu isolation and queues, for example, and see if it helps. For my cluster, as previously said, I think the problem is really trying to "move a montain using a small remote car" :) Every node was in charge of spark streaming jobs + cassandra + presto, with only 4 cores available.
3) Finally I also tuned the spark Cassandra connector, and this worked for my workload, but I think it will depend on your data and other variables.我变了: * number of concurrent writes from 5 to 2; * grouping.key from "partition" to "replica_set", because my partition key has high cardinality (they are almost unique within RDDs), and therefore it was useless to group batches according to the partition key. But this of course depends on your partition key. * batch.size.rows to 50. But this may depend on the amount of data you have for each streaming micro-batch. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md
I hope this post can help other people facing difficulties in streaming Big Data projects, since things can get very very difficult sometimes :) If some of my decisions are also wrong or misleading, please let me know. Every help is appreciated.
Thank you all,
Best regards,
Carlos Costa