用户定义的函数破坏了 pyspark 数据帧
User Defined Function breaks pyspark dataframe
我的spark版本是1.3,我用的是pyspark
我有一个名为 df 的大型数据框。
from pyspark import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.parquetFile("events.parquet")
然后我 select 数据框的几列并尝试计算行数。这很好用。
df3 = df.select("start", "end", "mrt")
print(type(df3))
print(df3.count())
然后我应用用户定义的函数将其中一列从字符串转换为数字,这也很好
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import LongType
CtI = UserDefinedFunction(lambda i: int(i), LongType())
df4 = df2.withColumn("mrt-2", CtI(df2.mrt))
但是,如果我尝试计算行数,我会得到一个异常,即使类型显示它是一个数据帧,就像 df3 一样。
print(type(df4))
print(df4.count())
我的错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-10-53941e183807> in <module>()
8 df4 = df2.withColumn("mrt-2", CtI(df2.mrt))
9 print(type(df4))
---> 10 print(df4.count())
11 df3 = df4.select("start", "end", "mrt-2").withColumnRenamed("mrt-2", "mrt")
/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/pyspark/sql/dataframe.py in count(self)
299 2L
300 """
--> 301 return self._jdf.count()
302
303 def collect(self):
/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling o152.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1379 in stage 12.0 failed 4 times, most recent failure: Lost task 1379.3 in stage 12.0 (TID 27021, va1ccogbds01.lab.ctllabs.io): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/worker.py", line 101, in main
process()
File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/worker.py", line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/serializers.py", line 236, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/pyspark/sql/functions.py", line 119, in <lambda>
File "<ipython-input-10-53941e183807>", line 7, in <lambda>
TypeError: int() argument must be a string or a number, not 'NoneType'
at org.apache.spark.api.python.PythonRDD$$anon.read(PythonRDD.scala:135)
at org.apache.spark.api.python.PythonRDD$$anon.next(PythonRDD.scala:98)
at org.apache.spark.api.python.PythonRDD$$anon.next(PythonRDD.scala:94)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.RDD$$anonfun$zip$$anon.hasNext(RDD.scala:743)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$$anonfun.apply(Aggregate.scala:127)
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$$anonfun.apply(Aggregate.scala:124)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1210)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1199)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1198)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1198)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1400)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1361)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
---------------------------------------------------------------------------
我是否正确使用了用户定义的函数?知道为什么数据框函数对数据框不起作用吗?
从堆栈跟踪来看,您的列似乎包含一个 None
值,它破坏了 int
转换;您可以尝试将 lambda 函数更改为 lambda i: int(i) if i else None
,以处理这种情况。
请注意,仅仅因为 df2.withColumn("mrt-2", CtI(df2.mrt))
没有抛出错误并不意味着您的代码没有问题:Spark 具有惰性评估,因此它实际上不会尝试 运行 您的代码直到你调用 count
、collect
或类似的东西。
您使用的是 spark-notebook 吗?
我曾经在 spark-notebook 中遇到过同样的错误。
但是相同的代码在 spark-submit
中运行良好
spark-submit YOURFILE.py
我的spark版本是1.3,我用的是pyspark
我有一个名为 df 的大型数据框。
from pyspark import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.parquetFile("events.parquet")
然后我 select 数据框的几列并尝试计算行数。这很好用。
df3 = df.select("start", "end", "mrt")
print(type(df3))
print(df3.count())
然后我应用用户定义的函数将其中一列从字符串转换为数字,这也很好
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import LongType
CtI = UserDefinedFunction(lambda i: int(i), LongType())
df4 = df2.withColumn("mrt-2", CtI(df2.mrt))
但是,如果我尝试计算行数,我会得到一个异常,即使类型显示它是一个数据帧,就像 df3 一样。
print(type(df4))
print(df4.count())
我的错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-10-53941e183807> in <module>()
8 df4 = df2.withColumn("mrt-2", CtI(df2.mrt))
9 print(type(df4))
---> 10 print(df4.count())
11 df3 = df4.select("start", "end", "mrt-2").withColumnRenamed("mrt-2", "mrt")
/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/pyspark/sql/dataframe.py in count(self)
299 2L
300 """
--> 301 return self._jdf.count()
302
303 def collect(self):
/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling o152.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1379 in stage 12.0 failed 4 times, most recent failure: Lost task 1379.3 in stage 12.0 (TID 27021, va1ccogbds01.lab.ctllabs.io): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/worker.py", line 101, in main
process()
File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/worker.py", line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/serializers.py", line 236, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/pyspark/sql/functions.py", line 119, in <lambda>
File "<ipython-input-10-53941e183807>", line 7, in <lambda>
TypeError: int() argument must be a string or a number, not 'NoneType'
at org.apache.spark.api.python.PythonRDD$$anon.read(PythonRDD.scala:135)
at org.apache.spark.api.python.PythonRDD$$anon.next(PythonRDD.scala:98)
at org.apache.spark.api.python.PythonRDD$$anon.next(PythonRDD.scala:94)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.RDD$$anonfun$zip$$anon.hasNext(RDD.scala:743)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$$anonfun.apply(Aggregate.scala:127)
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$$anonfun.apply(Aggregate.scala:124)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1210)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1199)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1198)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1198)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1400)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1361)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
---------------------------------------------------------------------------
我是否正确使用了用户定义的函数?知道为什么数据框函数对数据框不起作用吗?
从堆栈跟踪来看,您的列似乎包含一个 None
值,它破坏了 int
转换;您可以尝试将 lambda 函数更改为 lambda i: int(i) if i else None
,以处理这种情况。
请注意,仅仅因为 df2.withColumn("mrt-2", CtI(df2.mrt))
没有抛出错误并不意味着您的代码没有问题:Spark 具有惰性评估,因此它实际上不会尝试 运行 您的代码直到你调用 count
、collect
或类似的东西。
您使用的是 spark-notebook 吗? 我曾经在 spark-notebook 中遇到过同样的错误。 但是相同的代码在 spark-submit
中运行良好spark-submit YOURFILE.py