Pyspark - 使用累加器检查 Json 格式

Pyspark - checking Json format using accumulator

如何检查 JSON 文件是否已损坏,例如缺少 {、}、逗号或错误的数据类型。我试图通过使用累加器来实现,因为进程在多个执行程序上运行。

spark_config = SparkConf().setAppName(application_name)
ss = SparkSession.builder.config(conf=spark_config).getOrCreate()

class StringAccumulatorParam(AccumulatorParam):
  def zero(self, v):
      return []
  def addInPlace(self, variable, value):
      variable.append(value)
      return variable
errorCount = ss.sparkContext.accumulator(0)
errorValues = ss.sparkContext.accumulator("", StringAccumulatorParam())

newSchema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
    StructField("status", BooleanType(), True)])

errorDF = ss.read.json("/Users/test.jsonl")
errorDF2 = ss.createDataFrame(errorDF, newSchema).cache()

def checkErrorCount(row):
   global errorCount
   errorDF2["id"] = row. newSchema["id"]
      errorCount.add(1)
      errorValues.add(errorDF2["id"])

errorDF.foreach(lambda x: checkErrorCount(x))
print("{} rows had questionable values.".format(errorCount.value))

ss.stop()

这里是损坏的 JSON 文件 -

{"name":"Standards1","id":90,"status":true}
{"name":"Standards2","id":91
{"name":"Standards3","id":92,"status":true}
{"name":781,"id":93,"status":true}

我玩了一把,想出了以下内容。

在这 2 个解决方案中,我认为计数差异会更快,因为它将使用本机 Spark JSON 处理。

UDF 解决方案将在 Python 中进行 JSON 解析,这意味着您必须支付将每个文件行从 Java 传输到 Python 的成本,因此可能会慢一些。

import json
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, udf
from pyspark.sql.types import LongType

application_name = 'Count bad JSON lines'
spark_config = SparkConf().setAppName(application_name)
ss = SparkSession.builder.config(conf=spark_config).getOrCreate()

# Difference of counts solution
input_path = '/baddata.json'
total_lines = ss.read.text(input_path).count()
good_lines = ss.read.option('mode', 'DROPMALFORMED').json(input_path).count()
bad_lines = total_lines - good_lines
print('Found {} bad JSON lines in data'.format(bad_lines))

# Parse JSON with UDF solution
def is_bad(line):
    try:
        json.loads(line)
        return 0
    except ValueError:
        return 1

is_bad_udf = udf(is_bad, LongType())
lines = ss.read.text(input_path)
bad_sum = lines.select(sum(is_bad_udf('value'))).collect()[0][0]
print('Got {} bad lines'.format(bad_sum))

ss.stop()