过滤数组大小 = 1 pyspark 的行时出错

Error in filtering for rows with array size = 1 pyspark

以前,dataframe 是这样的

+----------+--------------------+
|     appId|                lang|
+----------+--------------------+
|1000098520|              ["EN"]|
|1001449696|              ["EN"]|
|1001780528|["AR","ZH","CS","...|
|1001892954|              ["EN"]|
|1001892954|              ["EN"]|
|1001976488|["EN","FR","DE","...|
|1002028916|              ["EN"]|
|1002908393|              ["EN"]|
|1003066972|["EN","FR","DE","...|
|1004217104|              ["EN"]|
|1004552566|              ["EN"]|
|1005192468|              ["EN"]|
|1005488142|["EN","JA","KO","...|
root
 |-- appId: string (nullable = true)
 |-- lang: string (nullable = true)

我尝试使用 json.loads() 将字符串转换为数组。但我认为它不符合 json.. 我如何将它转换为字符串数组?

+----------+--------------------+--------+
|     appId|                lang|len_lang|
+----------+--------------------+--------+
|1000098520|                [EN]|       1|
|1001449696|                [EN]|       1|
|1001780528|[AR, ZH, CS, NL, ...|      25|
|1001892954|                [EN]|       1|
|1001892954|                [EN]|       1|
|1001976488|    [EN, FR, DE, ES]|       4|
|1002028916|                [EN]|       1|
|1002908393|                [EN]|       1|

我有这个数据框。 lang列之前是字符串类型,我用udfjson.loads()把它转成了数组类型。然后我想过滤仅使用 'EN' 作为语言的 appid,即数组大小 == 1 并且仅包含 'EN'.

我试图用 F.size(F.col('lang'))==1 & F.array_contains(F.col('lang','EN')) 做一个 where() 语句...但是我得到了这个错误

21/11/19 10:07:32 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/../server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
    eval_type = read_int(infile)
  File "/../spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
    raise EOFError
EOFError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:260)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 352, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 142, in dump_stream
    for obj in iterator:
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 341, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 5, in <lambda>
  File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 2, in parse_array_from_string
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)```

使用 from_json 函数,如下例所示。

import pyspark.sql.functions as F

.....
df = df.withColumn('lang', F.expr('from_json(lang,"array<string>")')).select('*', F.size('lang').alias('len_lang'))
df.printSchema()
df.show(truncate=False)