检查字符串列并将错误值保存到 Databricks 时内存不足
Out of memory while checking string columns and saving error values to Databricks
我需要在数据框中进行双引号检查。因此,我正在遍历此检查的所有列,但需要花费大量时间。为此,我正在使用 Azure Databricks。
for column in columns_list:
column_name = "`" + column + "`"
df_reject = source_data.withColumn("flag_quotes",when(source_data[column_name].rlike("[\"\"]"),lit("Yes")).otherwise(lit("No")))
df_quo_rejected_df = df_reject.filter(col("flag_quotes") == "Yes")
df_quo_rejected_df = df_quo_rejected_df.withColumn('Error', lit(err))
df_quo_rejected_df.coalesce(1).write.mode("append").option("header","true")\
.option("delimiter",delimiter)\
.format("com.databricks.spark.csv")\
.save(filelocwrite)
我有大约 500 列和 4000 万条记录。我尝试在每次迭代时合并数据帧,但一段时间后该操作确实 OOM。所以我保存数据框并在每次迭代时附加它。请帮助我优化 运行 时间。
您可以尝试使用 exists
.
检查它们的值,而不是遍历列
from pyspark.sql import functions as F
columns_list = [f"`{c}`" for c in columns_list]
df_reject = source_data.filter(F.exists(F.array(*columns_list), lambda x: x.rlike("[\"\"]")))
df_cols_add = df_reject.select('*', F.lit('Yes').alias('flag_quotes'), F.lit(err).alias('Error'))
我需要在数据框中进行双引号检查。因此,我正在遍历此检查的所有列,但需要花费大量时间。为此,我正在使用 Azure Databricks。
for column in columns_list:
column_name = "`" + column + "`"
df_reject = source_data.withColumn("flag_quotes",when(source_data[column_name].rlike("[\"\"]"),lit("Yes")).otherwise(lit("No")))
df_quo_rejected_df = df_reject.filter(col("flag_quotes") == "Yes")
df_quo_rejected_df = df_quo_rejected_df.withColumn('Error', lit(err))
df_quo_rejected_df.coalesce(1).write.mode("append").option("header","true")\
.option("delimiter",delimiter)\
.format("com.databricks.spark.csv")\
.save(filelocwrite)
我有大约 500 列和 4000 万条记录。我尝试在每次迭代时合并数据帧,但一段时间后该操作确实 OOM。所以我保存数据框并在每次迭代时附加它。请帮助我优化 运行 时间。
您可以尝试使用 exists
.
from pyspark.sql import functions as F
columns_list = [f"`{c}`" for c in columns_list]
df_reject = source_data.filter(F.exists(F.array(*columns_list), lambda x: x.rlike("[\"\"]")))
df_cols_add = df_reject.select('*', F.lit('Yes').alias('flag_quotes'), F.lit(err).alias('Error'))