从配置单元 table 读取并在 pyspark 中更新相同的 table - 使用检查点

reading from hive table and updating same table in pyspark - using checkpoint

我正在使用 spark 2.3 版并尝试在 spark 中读取配置单元 table 为:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = spark.table("emp.emptable")

我在这里添加一个新列,其中包含来自系统的当前日期到现有数据框

import pyspark.sql.functions as F
newdf = df.withColumn('LOAD_DATE', F.current_date())

现在面临一个问题,当我尝试将此数据框编写为配置单元时 table

newdf.write.mode("overwrite").saveAsTable("emp.emptable")

pyspark.sql.utils.AnalysisException: u'Cannot overwrite table emp.emptable that is also being read from;'

所以我正在检查数据帧以打破血统,因为我正在从同一个数据帧读取和写入

checkpointDir = "/hdfs location/temp/tables/"
spark.sparkContext.setCheckpointDir(checkpointDir)
df = spark.table("emp.emptable").coalesce(1).checkpoint()
newdf = df.withColumn('LOAD_DATE', F.current_date())
newdf.write.mode("overwrite").saveAsTable("emp.emptable")

这种方式工作正常,新列已添加到配置单元 table。但每次创建检查点文件时我都必须删除它。有没有最好的方法来打破谱系并使用更新的列详细信息编写相同的数据框并将其保存到 hdfs 位置或作为配置单元 table.

或者有什么方法可以为检查点目录指定一个临时位置,它将被删除post spark 会话完成。

正如我们在 post 中讨论的那样,低于 属性 的设置是可行的。

spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")

那个问题有不同的背景。我们想保留 checkpointed 数据集,所以不关心添加清理解决方案。

上面 属性 的设置是 有时工作 (测试 scala,java 和 python ) 但很难依赖它。官方文档说通过设置这个 属性 it Controls whether to clean checkpoint files if the reference is out of scope. 我不知道具体是什么意思,因为我的理解是一旦 spark session/context 停止它应该清理它。如果有人能照亮它就太好了。

关于

Is there any best way to break the lineage

检查问题,@BiS找到了使用createDataFrame(RDD, Schema)方法切割血统的方法。不过我还没有亲自测试过。

仅供参考,我 通常不依赖上述 属性 并删除代码本身的 checkpointed 目录以确保安全。

我们可以得到如下的checkpointed目录:

斯卡拉:

//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")

scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3

//It gives String so we can use org.apache.hadoop.fs to delete path 

PySpark:

// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t 
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'

# notice 'u' at the start which means It returns unicode object use str(t)
# Below are the steps to get hadoop file system object and delete

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
False