检查字符串列并将错误值保存到 Databricks 时内存不足

Out of memory while checking string columns and saving error values to Databricks

我需要在数据框中进行双引号检查。因此,我正在遍历此检查的所有列,但需要花费大量时间。为此,我正在使用 Azure Databricks。

for column in columns_list:
      column_name = "`" + column + "`"
      df_reject = source_data.withColumn("flag_quotes",when(source_data[column_name].rlike("[\"\"]"),lit("Yes")).otherwise(lit("No")))     
      df_quo_rejected_df = df_reject.filter(col("flag_quotes") == "Yes") 
      
     
      df_quo_rejected_df = df_quo_rejected_df.withColumn('Error', lit(err))
      df_quo_rejected_df.coalesce(1).write.mode("append").option("header","true")\
                  .option("delimiter",delimiter)\
                  .format("com.databricks.spark.csv")\
                  .save(filelocwrite)

我有大约 500 列和 4000 万条记录。我尝试在每次迭代时合并数据帧,但一段时间后该操作确实 OOM。所以我保存数据框并在每次迭代时附加它。请帮助我优化 运行 时间。

您可以尝试使用 exists.

检查它们的值,而不是遍历列
from pyspark.sql import functions as F

columns_list = [f"`{c}`" for c in columns_list]
df_reject = source_data.filter(F.exists(F.array(*columns_list), lambda x: x.rlike("[\"\"]")))
df_cols_add = df_reject.select('*', F.lit('Yes').alias('flag_quotes'), F.lit(err).alias('Error'))