Spark 数据集选择性重新计算
Spark Dataset selective recompute
我知道 Spark 知道如何分配需要在新节点上完成的工作,例如在另一个节点发生故障的情况下启动的新节点。
我想知道这是否可以用于其他用例。
假设我有一个转换和动作树。当 datasets/dataframes 之一更新时会发生什么(例如导入新文件)。在这种情况下,我只想重复那些受到影响并与此更改相关联的转换和操作。其他不相关的转换和操作应该从缓存中使用,因为它们没有受到影响。
现在,如果我只有少数这些数据框、转换和操作,我可以手动完成。但是我有几十个或更多这样的 DF 和动作,我想了解 spark 是否在框架内内置了一些可以帮助我的东西。
这是我的代码示例:
val carLines = spark
.read
.option("header", "true")
.schema(carLineSchema)
.csv("src/test/resources/cars")
val ageMappingFunction: Int => String = (age: Int) => if (age > 80) "old" else "young"
//
val _age = udf.register("_age", ageMappingFunction)
val personLines = spark
.read
.option("header", "true")
.schema(personLineSchema)
.csv("src/test/resources/persons")
.withColumn("_age", _age($"age"))
val accidentsLines = spark
.read
.option("header", "true")
.schema(accidentLineSchema)
.csv("src/test/resources/accidents")
val carOwners = personLines
.withColumnRenamed("id", "driver_id")
.join(carLines, Seq("driver_id"), "left")
.withColumnRenamed("id", "car_id")
.withColumnRenamed("car_make", "car_maker")
.withColumnRenamed("driver_id", "id")
现在进行一些转换:
val accidentsWithDrivers = accidentsLines
.join(personLines.withColumnRenamed("id", "driver_id"), "driver_id")
val accidentsPerDriverID = accidentsWithDrivers
.groupBy("driver_id")
.agg(Map(
"name" -> "count"
))
.withColumnRenamed("count(name)", "accident_count")
.withColumnRenamed("driver_id", "id")
val finalTable = carOwners
.join(numberOfCarsPerDriver, Seq("id", "name", "age", "_age"))
.join(accidentsPerDriverID, "id")
然后我执行一些操作(为简单起见,我将使用 'show'):
carOwners.show(true)
numberOfCarsPerDriver.show(true)
finalTable.show(true)
所以 - 我要问的是如果 accidentsLines
发生了变化但 carLines
或 personLines
没有发生变化怎么办。我们可以使用 carLines
和 personLines
的缓存值进行 carOwners
转换吗?
换句话说:
我可以以某种方式使用 RDD#cache() api 在不同的驱动程序运行之间生存假设我想将它保存在 spark 集群中的内存中吗?
原来我需要使用 job-server 或使用 Apache Ignite 的 IgniteRDD 支持:
//WRITE
val igniteContext = new IgniteContext(spark.sparkContext, "ignite-config.xml", true)
val schema = dataframe.schema
val rdd = dataframe.rdd
igniteContext.ignite().getOrCreateCache("ignite-cache").put("schema", schema)
igniteContext.fromCache(name).saveValues(rdd)
//READ
val schema = igniteContext.ignite()
.getOrCreateCache[String, StructType]("ignite-cache")
.get("schema")
.asInstanceOf[StructType]
val igniteRdd: IgniteRDD[String, Row] = igniteContext.fromCache(name)
val rdd = igniteRdd.map(a => a._2)
val dataframe = spark.createDataFrame(rdd, schema)
我知道 Spark 知道如何分配需要在新节点上完成的工作,例如在另一个节点发生故障的情况下启动的新节点。
我想知道这是否可以用于其他用例。
假设我有一个转换和动作树。当 datasets/dataframes 之一更新时会发生什么(例如导入新文件)。在这种情况下,我只想重复那些受到影响并与此更改相关联的转换和操作。其他不相关的转换和操作应该从缓存中使用,因为它们没有受到影响。
现在,如果我只有少数这些数据框、转换和操作,我可以手动完成。但是我有几十个或更多这样的 DF 和动作,我想了解 spark 是否在框架内内置了一些可以帮助我的东西。
这是我的代码示例:
val carLines = spark
.read
.option("header", "true")
.schema(carLineSchema)
.csv("src/test/resources/cars")
val ageMappingFunction: Int => String = (age: Int) => if (age > 80) "old" else "young"
//
val _age = udf.register("_age", ageMappingFunction)
val personLines = spark
.read
.option("header", "true")
.schema(personLineSchema)
.csv("src/test/resources/persons")
.withColumn("_age", _age($"age"))
val accidentsLines = spark
.read
.option("header", "true")
.schema(accidentLineSchema)
.csv("src/test/resources/accidents")
val carOwners = personLines
.withColumnRenamed("id", "driver_id")
.join(carLines, Seq("driver_id"), "left")
.withColumnRenamed("id", "car_id")
.withColumnRenamed("car_make", "car_maker")
.withColumnRenamed("driver_id", "id")
现在进行一些转换:
val accidentsWithDrivers = accidentsLines
.join(personLines.withColumnRenamed("id", "driver_id"), "driver_id")
val accidentsPerDriverID = accidentsWithDrivers
.groupBy("driver_id")
.agg(Map(
"name" -> "count"
))
.withColumnRenamed("count(name)", "accident_count")
.withColumnRenamed("driver_id", "id")
val finalTable = carOwners
.join(numberOfCarsPerDriver, Seq("id", "name", "age", "_age"))
.join(accidentsPerDriverID, "id")
然后我执行一些操作(为简单起见,我将使用 'show'):
carOwners.show(true)
numberOfCarsPerDriver.show(true)
finalTable.show(true)
所以 - 我要问的是如果 accidentsLines
发生了变化但 carLines
或 personLines
没有发生变化怎么办。我们可以使用 carLines
和 personLines
的缓存值进行 carOwners
转换吗?
换句话说: 我可以以某种方式使用 RDD#cache() api 在不同的驱动程序运行之间生存假设我想将它保存在 spark 集群中的内存中吗?
原来我需要使用 job-server 或使用 Apache Ignite 的 IgniteRDD 支持:
//WRITE
val igniteContext = new IgniteContext(spark.sparkContext, "ignite-config.xml", true)
val schema = dataframe.schema
val rdd = dataframe.rdd
igniteContext.ignite().getOrCreateCache("ignite-cache").put("schema", schema)
igniteContext.fromCache(name).saveValues(rdd)
//READ
val schema = igniteContext.ignite()
.getOrCreateCache[String, StructType]("ignite-cache")
.get("schema")
.asInstanceOf[StructType]
val igniteRdd: IgniteRDD[String, Row] = igniteContext.fromCache(name)
val rdd = igniteRdd.map(a => a._2)
val dataframe = spark.createDataFrame(rdd, schema)