将 dataframe 从 spark 集群写入 cassandra 集群:分区和性能调优

Write dataframe from spark cluster to cassandra cluster: Partitioning and Performance Tuning

我有两个集群 - 1. Cloudera Hadoop- Spark 作业 运行 在这里 2. Cloud - Cassandra集群,多个DC

在将我的 spark 作业的数据帧写入 cassandra 集群时,我在写入之前在 spark 中进行了重新分区 (repartionCount=10)。见下文:

import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
  .mode(SaveMode.Append)
  .options(options)
  .option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
  .option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
  .save()

在我的多租户 spark 集群中,对于具有 20M 记录的 spark 批处理加载和低于配置,我看到很多任务失败、资源抢占和即时失败。

spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20 
spark.cassandra.connection.compression=LZ4

我应该如何调整它?重新分区是罪魁祸首吗?

PS:一开始我的理解是:对于20M行的负载,"repartition"应该将负载平均分配给执行器(每个分区2M行),然后进行批处理在这些分区级别上(在 2M 行上)。但是现在,如果 spark-cassandra-connector 在整个数据帧级别(整个 20M 行)进行批处理,我怀疑这是否会导致不必要的洗牌。

更新:删除 "repartition" 大大降低了我的 cloudera spark 集群的性能(在 spark 级别设置的默认分区是 - spark.sql.shuffle.partitions: 200),所以我深入挖掘并发现了我的初步理解是正确的。请注意我的 spark 和 cassandra 集群是不同的。 Datastax spark-cassandra-connector 使用 cassandra 协调器节点为每个分区打开一个连接,因此我决定让它保持不变。正如亚历克斯所建议的那样,我减少了并发写入,我相信这应该有所帮助。

您不需要在 Spark 中进行重新分区 - 只需将数据从 Spark 写入 Cassandra,不要尝试更改 Spark Cassandra 连接器默认设置 - 它们在大多数情况下都可以正常工作。您需要查看发生了什么样的阶段故障 - 很可能您只是因为 spark.cassandra.output.concurrent.writes=20(使用默认值 (5))而使 Cassandra 过载 - 有时较少的编写器有助于更快地写入数据不要使 Cassandra 超载,作业不会重新启动。

P.S。 spark.cassandra.output.batch.grouping.key 中的 partition - 它不是 Spark 分区,它是 Cassandra 分区,它取决于分区键列的值。