在 Cassandra table 扫描中设置 Spark 任务数

Setting number of Spark tasks on a Cassandra table scan

我有一个简单的 Spark 作业,它从始终运行 6 个任务的 5 节点 Cassandra 集群读取 500m 行,由于每个任务的大小,这会导致写入问题。我试过调整input_split_size,似乎没有效果。目前我不得不重新分区 table 扫描,这并不理想,因为它很昂贵。

阅读了几篇文章后,我尝试增加我的启动脚本(如下)中的执行器数量,尽管这没有效果。

如果无法在 Cassandra table 扫描中设置任务数量,那没关系,我会凑合着做..但我一直有这种烦躁的感觉,觉得我在这里遗漏了一些东西。

Spark 工作人员生活在 C* 节点上,这些节点是 8 核、64gb 服务器,每个服务器都有 2TB SSD。

...
val conf = new SparkConf(true).set("spark.cassandra.connection.host",
cassandraHost).setAppName("rowMigration")
  conf.set("spark.shuffle.memoryFraction", "0.4")
  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  conf.set("spark.executor.memory", "15G")
  conf.set("spark.cassandra.input.split.size_in_mb", "32") //default 64mb
  conf.set("spark.cassandra.output.batch.size.bytes", "1000") //default
  conf.set("spark.cassandra.output.concurrent.writes", "5") //default

val sc = new SparkContext(conf)

val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable)
  .select("accountid", "userid", "eventname", "eventid", "eventproperties")
  .filter(row=>row.getString("accountid").equals("someAccount"))
  .repartition(100)

val object = rawEvents
  .map(ele => (ele.getString("userid"),
    UUID.randomUUID(),
    UUID.randomUUID(),
    ele.getUUID("eventid"),
    ele.getString("eventname"),
    "event type",
    UUIDs.unixTimestamp(ele.getUUID("eventid")),
    ele.getMap[String, String]("eventproperties"),
    Map[String, String](),
    Map[String, String](),
    Map[String, String]()))
  .map(row=>MyObject(row))

Object.saveToCassandra(targetCassandraKeyspace,eventTable)

启动脚本:

#!/bin/bash
export SHADED_JAR="Migrate.jar"
export SPARKHOME="${SPARKHOME:-/opt/spark}"
export SPARK_CLASSPATH="$SHADED_JAR:$SPARK_CLASSPATH"
export CLASS=com.migration.migrate
"${SPARKHOME}/bin/spark-submit" \
        --class "${CLASS}" \
        --jars $SHADED_JAR,$SHADED_JAR \
        --master spark://cas-1-5:7077  \
        --num-executors 15 \
        --executor-memory 20g \
        --executor-cores 4 "$SHADED_JAR" \
        --worker-cores 20 \
        -Dcassandra.connection.host=10.1.20.201 \
        -Dzookeeper.host=10.1.20.211:2181 \

编辑 - 按照 Piotr 的回答:

我在 sc.cassandraTable 上设置了 ReadConf.splitCount 如下,但这不会改变生成的任务数,这意味着我仍然需要重新分区 table 扫描。我开始认为我在考虑这个问题并且重新分区是必要的。目前这项工作大约需要 1.5 小时,并且将 table 扫描重新划分为 1000 个任务,每个任务大约 10MB,将写入时间减少到几分钟。

val cassReadConfig = new ReadConf {
      ReadConf.apply(splitCount = Option(1000)
        )
    }

    val sc = new SparkContext(conf)

    val rawEvents = sc.cassandraTable(cassandraKeyspace, eventTable)
    .withReadConf(readConf = cassReadConfig)

split.size_in_mb 参数似乎有错误。代码可能会将其解释为字节而不是兆字节,因此请尝试将 32 更改为更大的值。请参阅答案中的示例

自 spark connector 1.3 起,拆分大小是根据自 Cassandra 2.1.5 起可用的 system.size_estimates Cassandra table 估算的。此 table 由 Cassandra 定期刷新,并且在 loading/removing 新数据或加入新节点后不久,其内容可能不正确。检查那里的估计是否反映了您的数据量。这是一个相对较新的功能,因此也很可能存在一些错误。

如果估计有误,或者您 运行 年长的 Cassandra,我们保留了覆盖自动拆分大小调整的功能。 sc.cassandraTable 采用 ReadConf 参数,您可以在其中设置 splitCount,这将强制执行固定数量的拆分。

至于split_size_in_mb参数,项目源码确实存在一段时间的bug,但是在发布到maven发布的任何版本之前都已经修复了。因此,除非您从(旧)源代码编译连接器,否则您不应该点击它。