dask dataframe to spark 与 pandas dataframe to spark 的工作方式不同
dask dataframe to spark not working the same way as pandas dataframe to spark
我正在从 netcdf 文件中读取一些风速数据。这会生成一个 xarray 数据集,我可以将其转换为 pandas and/or dask 数据帧。最终,由于数据量大,我想先转换为 dask 数据帧,然后再转换为 pyspark。但是,当从 dask 转换为 spark 时,我收到一个错误,而在对 spark 执行等效的 pandas 时,我没有收到错误。请参阅下面的一些代码。
Pandas 先
df=u10.isel(time0=0).to_dataframe().reset_index()
print(df.head())
print("=======================================================")
print(df.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(df)
pyspark_df.show(5)
lon lat time0 eastward_wind_at_100_metres \
0 0.0 90.00 2021-03-01 -0.375
1 0.0 89.75 2021-03-01 -7.000
2 0.0 89.50 2021-03-01 -7.125
3 0.0 89.25 2021-03-01 -7.250
4 0.0 89.00 2021-03-01 -7.250
eastward_wind_at_10_metres northward_wind_at_100_metres \
0 -0.1875 -0.1250
1 -5.0625 0.7500
2 -5.1875 0.8125
3 -5.3750 1.0625
4 -6.0000 1.5625
northward_wind_at_10_metres
0 0.0000
1 0.0625
2 0.1250
3 0.3750
4 1.1250
=======================================================
lon float64
lat float64
time0 datetime64[ns]
eastward_wind_at_100_metres float32
eastward_wind_at_10_metres float32
northward_wind_at_100_metres float32
northward_wind_at_10_metres float32
dtype: object
=======================================================
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
|lon| lat| time0|eastward_wind_at_100_metres|eastward_wind_at_10_metres|northward_wind_at_100_metres|northward_wind_at_10_metres|
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
|0.0| 90.0|2021-03-01 00:00:00| -0.375| -0.1875| -0.125| 0.0|
|0.0|89.75|2021-03-01 00:00:00| -7.0| -5.0625| 0.75| 0.0625|
|0.0| 89.5|2021-03-01 00:00:00| -7.125| -5.1875| 0.8125| 0.125|
|0.0|89.25|2021-03-01 00:00:00| -7.25| -5.375| 1.0625| 0.375|
|0.0| 89.0|2021-03-01 00:00:00| -7.25| -6.0| 1.5625| 1.125|
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
only showing top 5 rows
现在开始
ddf=u10.isel(time0=0).to_dask_dataframe()
print(ddf.head())
print("=======================================================")
print(ddf.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(ddf)
pyspark_df.show(5)
lon lat time0 eastward_wind_at_100_metres \
0 0.0 90.00 2021-03-01 -0.375
1 0.0 89.75 2021-03-01 -7.000
2 0.0 89.50 2021-03-01 -7.125
3 0.0 89.25 2021-03-01 -7.250
4 0.0 89.00 2021-03-01 -7.250
eastward_wind_at_10_metres northward_wind_at_100_metres \
0 -0.1875 -0.1250
1 -5.0625 0.7500
2 -5.1875 0.8125
3 -5.3750 1.0625
4 -6.0000 1.5625
northward_wind_at_10_metres
0 0.0000
1 0.0625
2 0.1250
3 0.3750
4 1.1250
=======================================================
lon float32
lat float32
time0 datetime64[ns]
eastward_wind_at_100_metres float32
eastward_wind_at_10_metres float32
northward_wind_at_100_metres float32
northward_wind_at_10_metres float32
dtype: object
=======================================================
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_18808/850613924.py in <module>
4 print(ddf.dtypes)
5 print("=======================================================")
----> 6 pyspark_df = spark.createDataFrame(ddf)
7 pyspark_df.show(5)
...
...
...
1063
1064 else:
-> 1065 raise TypeError("Can not infer schema for type: %s" % type(row))
1066
1067 fields = []
TypeError: Can not infer schema for type: <class 'str'>
如果有人能解释为什么会发生这种情况并提供一个很好的解决方案
添加了架构以创建数据帧调用
fields = [
StructField("lon", FloatType(), True),\
StructField("lat", FloatType(), True),\
StructField("time0", TimestampType(), True),\
StructField("eastward_wind_at_100_metres",FloatType(),True), \
StructField("eastward_wind_at_10_metres",FloatType(),True), \
StructField("northward_wind_at_100_metres",FloatType(),True), \
StructField("northward_wind_at_10_metres",FloatType(),True) ]
schema = StructType(fields)
ddf=u10.isel(time0=0).to_dask_dataframe()
print(ddf.head())
print("=======================================================")
print(ddf.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(ddf,schema)
pyspark_df.show(10)
lon lat time0 eastward_wind_at_100_metres \
0 0.0 90.00 2021-03-01 -0.375
1 0.0 89.75 2021-03-01 -7.000
2 0.0 89.50 2021-03-01 -7.125
3 0.0 89.25 2021-03-01 -7.250
4 0.0 89.00 2021-03-01 -7.250
eastward_wind_at_10_metres northward_wind_at_100_metres \
0 -0.1875 -0.1250
1 -5.0625 0.7500
2 -5.1875 0.8125
3 -5.3750 1.0625
4 -6.0000 1.5625
northward_wind_at_10_metres
0 0.0000
1 0.0625
2 0.1250
3 0.3750
4 1.1250
=======================================================
lon float32
lat float32
time0 datetime64[ns]
eastward_wind_at_100_metres float32
eastward_wind_at_10_metres float32
northward_wind_at_100_metres float32
northward_wind_at_10_metres float32
dtype: object
=======================================================
...
...
~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify_struct(obj)
1394 verifier(d.get(f))
1395 else:
-> 1396 raise TypeError(new_msg("StructType can not accept object %r in type %s"
1397 % (obj, type(obj))))
1398 verify_value = verify_struct
TypeError: StructType can not accept object 'lon' in type <class 'str'>
完整追溯
TypeError Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_5368/3996125343.py in <module>
15 print(ddf.dtypes)
16 print("=======================================================")
---> 17 pyspark_df = spark.createDataFrame(ddf,schema)
18 pyspark_df.show(10)
~\Anaconda3\lib\site-packages\pyspark\sql\session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
673 return super(SparkSession, self).createDataFrame(
674 data, schema, samplingRatio, verifySchema)
--> 675 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
676
677 def _create_dataframe(self, data, schema, samplingRatio, verifySchema):
~\Anaconda3\lib\site-packages\pyspark\sql\session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
698 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
699 else:
--> 700 rdd, schema = self._createFromLocal(map(prepare, data), schema)
701 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
702 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
~\Anaconda3\lib\site-packages\pyspark\sql\session.py in _createFromLocal(self, data, schema)
507 # make sure data could consumed multiple times
508 if not isinstance(data, list):
--> 509 data = list(data)
510
511 if schema is None or isinstance(schema, (list, tuple)):
~\Anaconda3\lib\site-packages\pyspark\sql\session.py in prepare(obj)
680
681 def prepare(obj):
--> 682 verify_func(obj)
683 return obj
684 elif isinstance(schema, DataType):
~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify(obj)
1407 def verify(obj):
1408 if not verify_nullability(obj):
-> 1409 verify_value(obj)
1410
1411 return verify
~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify_struct(obj)
1394 verifier(d.get(f))
1395 else:
-> 1396 raise TypeError(new_msg("StructType can not accept object %r in type %s"
1397 % (obj, type(obj))))
1398 verify_value = verify_struct
TypeError: StructType can not accept object 'lon' in type <class 'str'>
旁白:您确定要用 spark 处理这些数据吗? Dask 与 xarray 集成得很好,您可能会发现转换不值得,尤其是因为您需要复制和转换数据并有两个集群系统 运行.
简短回答:createDataFrame
完全支持 pandas 而不是“pandas-like 对象”,例如 dask 数据帧。来自文档字符串:
data : :class:`RDD` or iterable
an RDD of any kind of SQL data representation (:class:`Row`,
:class:`tuple`, ``int``, ``boolean``, etc.), or :class:`list`, or
:class:`pandas.DataFrame`.
一个 dask 数据帧由惰性分区组成,但你要求 spark 将它们发送给 spark workers。这不是一个简单的操作!您 可以 决定为每个 dask 分区创建 spark 数据帧,并在工作端连接它们 - 这可能会实现您所追求的目标,但效率不高。
to_spark_df = dask.delayed(spark.createDataFrame)
pieces = dask.compute(*[to_spark_df(d) for d in ddf.to_delayed()])
spark_df = functoos.reduce(pyspark.sql.DataFrame.unionAll, pieces)
(注意这是在链中应用union,spark中好像没有简单的concat操作)
我正在从 netcdf 文件中读取一些风速数据。这会生成一个 xarray 数据集,我可以将其转换为 pandas and/or dask 数据帧。最终,由于数据量大,我想先转换为 dask 数据帧,然后再转换为 pyspark。但是,当从 dask 转换为 spark 时,我收到一个错误,而在对 spark 执行等效的 pandas 时,我没有收到错误。请参阅下面的一些代码。
Pandas 先
df=u10.isel(time0=0).to_dataframe().reset_index()
print(df.head())
print("=======================================================")
print(df.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(df)
pyspark_df.show(5)
lon lat time0 eastward_wind_at_100_metres \
0 0.0 90.00 2021-03-01 -0.375
1 0.0 89.75 2021-03-01 -7.000
2 0.0 89.50 2021-03-01 -7.125
3 0.0 89.25 2021-03-01 -7.250
4 0.0 89.00 2021-03-01 -7.250
eastward_wind_at_10_metres northward_wind_at_100_metres \
0 -0.1875 -0.1250
1 -5.0625 0.7500
2 -5.1875 0.8125
3 -5.3750 1.0625
4 -6.0000 1.5625
northward_wind_at_10_metres
0 0.0000
1 0.0625
2 0.1250
3 0.3750
4 1.1250
=======================================================
lon float64
lat float64
time0 datetime64[ns]
eastward_wind_at_100_metres float32
eastward_wind_at_10_metres float32
northward_wind_at_100_metres float32
northward_wind_at_10_metres float32
dtype: object
=======================================================
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
|lon| lat| time0|eastward_wind_at_100_metres|eastward_wind_at_10_metres|northward_wind_at_100_metres|northward_wind_at_10_metres|
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
|0.0| 90.0|2021-03-01 00:00:00| -0.375| -0.1875| -0.125| 0.0|
|0.0|89.75|2021-03-01 00:00:00| -7.0| -5.0625| 0.75| 0.0625|
|0.0| 89.5|2021-03-01 00:00:00| -7.125| -5.1875| 0.8125| 0.125|
|0.0|89.25|2021-03-01 00:00:00| -7.25| -5.375| 1.0625| 0.375|
|0.0| 89.0|2021-03-01 00:00:00| -7.25| -6.0| 1.5625| 1.125|
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
only showing top 5 rows
现在开始
ddf=u10.isel(time0=0).to_dask_dataframe()
print(ddf.head())
print("=======================================================")
print(ddf.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(ddf)
pyspark_df.show(5)
lon lat time0 eastward_wind_at_100_metres \
0 0.0 90.00 2021-03-01 -0.375
1 0.0 89.75 2021-03-01 -7.000
2 0.0 89.50 2021-03-01 -7.125
3 0.0 89.25 2021-03-01 -7.250
4 0.0 89.00 2021-03-01 -7.250
eastward_wind_at_10_metres northward_wind_at_100_metres \
0 -0.1875 -0.1250
1 -5.0625 0.7500
2 -5.1875 0.8125
3 -5.3750 1.0625
4 -6.0000 1.5625
northward_wind_at_10_metres
0 0.0000
1 0.0625
2 0.1250
3 0.3750
4 1.1250
=======================================================
lon float32
lat float32
time0 datetime64[ns]
eastward_wind_at_100_metres float32
eastward_wind_at_10_metres float32
northward_wind_at_100_metres float32
northward_wind_at_10_metres float32
dtype: object
=======================================================
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_18808/850613924.py in <module>
4 print(ddf.dtypes)
5 print("=======================================================")
----> 6 pyspark_df = spark.createDataFrame(ddf)
7 pyspark_df.show(5)
...
...
...
1063
1064 else:
-> 1065 raise TypeError("Can not infer schema for type: %s" % type(row))
1066
1067 fields = []
TypeError: Can not infer schema for type: <class 'str'>
如果有人能解释为什么会发生这种情况并提供一个很好的解决方案
添加了架构以创建数据帧调用
fields = [
StructField("lon", FloatType(), True),\
StructField("lat", FloatType(), True),\
StructField("time0", TimestampType(), True),\
StructField("eastward_wind_at_100_metres",FloatType(),True), \
StructField("eastward_wind_at_10_metres",FloatType(),True), \
StructField("northward_wind_at_100_metres",FloatType(),True), \
StructField("northward_wind_at_10_metres",FloatType(),True) ]
schema = StructType(fields)
ddf=u10.isel(time0=0).to_dask_dataframe()
print(ddf.head())
print("=======================================================")
print(ddf.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(ddf,schema)
pyspark_df.show(10)
lon lat time0 eastward_wind_at_100_metres \
0 0.0 90.00 2021-03-01 -0.375
1 0.0 89.75 2021-03-01 -7.000
2 0.0 89.50 2021-03-01 -7.125
3 0.0 89.25 2021-03-01 -7.250
4 0.0 89.00 2021-03-01 -7.250
eastward_wind_at_10_metres northward_wind_at_100_metres \
0 -0.1875 -0.1250
1 -5.0625 0.7500
2 -5.1875 0.8125
3 -5.3750 1.0625
4 -6.0000 1.5625
northward_wind_at_10_metres
0 0.0000
1 0.0625
2 0.1250
3 0.3750
4 1.1250
=======================================================
lon float32
lat float32
time0 datetime64[ns]
eastward_wind_at_100_metres float32
eastward_wind_at_10_metres float32
northward_wind_at_100_metres float32
northward_wind_at_10_metres float32
dtype: object
=======================================================
...
...
~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify_struct(obj)
1394 verifier(d.get(f))
1395 else:
-> 1396 raise TypeError(new_msg("StructType can not accept object %r in type %s"
1397 % (obj, type(obj))))
1398 verify_value = verify_struct
TypeError: StructType can not accept object 'lon' in type <class 'str'>
完整追溯
TypeError Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_5368/3996125343.py in <module>
15 print(ddf.dtypes)
16 print("=======================================================")
---> 17 pyspark_df = spark.createDataFrame(ddf,schema)
18 pyspark_df.show(10)
~\Anaconda3\lib\site-packages\pyspark\sql\session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
673 return super(SparkSession, self).createDataFrame(
674 data, schema, samplingRatio, verifySchema)
--> 675 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
676
677 def _create_dataframe(self, data, schema, samplingRatio, verifySchema):
~\Anaconda3\lib\site-packages\pyspark\sql\session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
698 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
699 else:
--> 700 rdd, schema = self._createFromLocal(map(prepare, data), schema)
701 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
702 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
~\Anaconda3\lib\site-packages\pyspark\sql\session.py in _createFromLocal(self, data, schema)
507 # make sure data could consumed multiple times
508 if not isinstance(data, list):
--> 509 data = list(data)
510
511 if schema is None or isinstance(schema, (list, tuple)):
~\Anaconda3\lib\site-packages\pyspark\sql\session.py in prepare(obj)
680
681 def prepare(obj):
--> 682 verify_func(obj)
683 return obj
684 elif isinstance(schema, DataType):
~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify(obj)
1407 def verify(obj):
1408 if not verify_nullability(obj):
-> 1409 verify_value(obj)
1410
1411 return verify
~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify_struct(obj)
1394 verifier(d.get(f))
1395 else:
-> 1396 raise TypeError(new_msg("StructType can not accept object %r in type %s"
1397 % (obj, type(obj))))
1398 verify_value = verify_struct
TypeError: StructType can not accept object 'lon' in type <class 'str'>
旁白:您确定要用 spark 处理这些数据吗? Dask 与 xarray 集成得很好,您可能会发现转换不值得,尤其是因为您需要复制和转换数据并有两个集群系统 运行.
简短回答:createDataFrame
完全支持 pandas 而不是“pandas-like 对象”,例如 dask 数据帧。来自文档字符串:
data : :class:`RDD` or iterable an RDD of any kind of SQL data representation (:class:`Row`, :class:`tuple`, ``int``, ``boolean``, etc.), or :class:`list`, or :class:`pandas.DataFrame`.
一个 dask 数据帧由惰性分区组成,但你要求 spark 将它们发送给 spark workers。这不是一个简单的操作!您 可以 决定为每个 dask 分区创建 spark 数据帧,并在工作端连接它们 - 这可能会实现您所追求的目标,但效率不高。
to_spark_df = dask.delayed(spark.createDataFrame)
pieces = dask.compute(*[to_spark_df(d) for d in ddf.to_delayed()])
spark_df = functoos.reduce(pyspark.sql.DataFrame.unionAll, pieces)
(注意这是在链中应用union,spark中好像没有简单的concat操作)