dask dataframe to spark 与 pandas dataframe to spark 的工作方式不同

dask dataframe to spark not working the same way as pandas dataframe to spark

我正在从 netcdf 文件中读取一些风速数据。这会生成一个 xarray 数据集,我可以将其转换为 pandas and/or dask 数据帧。最终,由于数据量大,我想先转换为 dask 数据帧,然后再转换为 pyspark。但是,当从 dask 转换为 spark 时,我收到一个错误,而在对 spark 执行等效的 pandas 时,我没有收到错误。请参阅下面的一些代码。

Pandas 先

df=u10.isel(time0=0).to_dataframe().reset_index()
print(df.head())
print("=======================================================")
print(df.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(df) 
pyspark_df.show(5)


   lon    lat      time0  eastward_wind_at_100_metres  \
0  0.0  90.00 2021-03-01                       -0.375   
1  0.0  89.75 2021-03-01                       -7.000   
2  0.0  89.50 2021-03-01                       -7.125   
3  0.0  89.25 2021-03-01                       -7.250   
4  0.0  89.00 2021-03-01                       -7.250   

   eastward_wind_at_10_metres  northward_wind_at_100_metres  \
0                     -0.1875                       -0.1250   
1                     -5.0625                        0.7500   
2                     -5.1875                        0.8125   
3                     -5.3750                        1.0625   
4                     -6.0000                        1.5625   

   northward_wind_at_10_metres  
0                       0.0000  
1                       0.0625  
2                       0.1250  
3                       0.3750  
4                       1.1250  
=======================================================
lon                                    float64
lat                                    float64
time0                           datetime64[ns]
eastward_wind_at_100_metres            float32
eastward_wind_at_10_metres             float32
northward_wind_at_100_metres           float32
northward_wind_at_10_metres            float32
dtype: object
=======================================================
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
|lon|  lat|              time0|eastward_wind_at_100_metres|eastward_wind_at_10_metres|northward_wind_at_100_metres|northward_wind_at_10_metres|
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
|0.0| 90.0|2021-03-01 00:00:00|                     -0.375|                   -0.1875|                      -0.125|                        0.0|
|0.0|89.75|2021-03-01 00:00:00|                       -7.0|                   -5.0625|                        0.75|                     0.0625|
|0.0| 89.5|2021-03-01 00:00:00|                     -7.125|                   -5.1875|                      0.8125|                      0.125|
|0.0|89.25|2021-03-01 00:00:00|                      -7.25|                    -5.375|                      1.0625|                      0.375|
|0.0| 89.0|2021-03-01 00:00:00|                      -7.25|                      -6.0|                      1.5625|                      1.125|
+---+-----+-------------------+---------------------------+--------------------------+----------------------------+---------------------------+
only showing top 5 rows

现在开始

ddf=u10.isel(time0=0).to_dask_dataframe()
print(ddf.head())
print("=======================================================")
print(ddf.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(ddf) 
pyspark_df.show(5)

 lon    lat      time0  eastward_wind_at_100_metres  \
0  0.0  90.00 2021-03-01                       -0.375   
1  0.0  89.75 2021-03-01                       -7.000   
2  0.0  89.50 2021-03-01                       -7.125   
3  0.0  89.25 2021-03-01                       -7.250   
4  0.0  89.00 2021-03-01                       -7.250   

   eastward_wind_at_10_metres  northward_wind_at_100_metres  \
0                     -0.1875                       -0.1250   
1                     -5.0625                        0.7500   
2                     -5.1875                        0.8125   
3                     -5.3750                        1.0625   
4                     -6.0000                        1.5625   

   northward_wind_at_10_metres  
0                       0.0000  
1                       0.0625  
2                       0.1250  
3                       0.3750  
4                       1.1250  
=======================================================
lon                                    float32
lat                                    float32
time0                           datetime64[ns]
eastward_wind_at_100_metres            float32
eastward_wind_at_10_metres             float32
northward_wind_at_100_metres           float32
northward_wind_at_10_metres            float32
dtype: object
=======================================================
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_18808/850613924.py in <module>
      4 print(ddf.dtypes)
      5 print("=======================================================")
----> 6 pyspark_df = spark.createDataFrame(ddf)
      7 pyspark_df.show(5)
...
...
...
  1063 
   1064     else:
-> 1065         raise TypeError("Can not infer schema for type: %s" % type(row))
   1066 
   1067     fields = []

TypeError: Can not infer schema for type: <class 'str'>

如果有人能解释为什么会发生这种情况并提供一个很好的解决方案

添加了架构以创建数据帧调用

fields = [
 StructField("lon", FloatType(), True),\
 StructField("lat", FloatType(), True),\
 StructField("time0", TimestampType(), True),\
 StructField("eastward_wind_at_100_metres",FloatType(),True), \
 StructField("eastward_wind_at_10_metres",FloatType(),True), \
 StructField("northward_wind_at_100_metres",FloatType(),True), \
 StructField("northward_wind_at_10_metres",FloatType(),True)  ]
 
schema = StructType(fields)

ddf=u10.isel(time0=0).to_dask_dataframe()
print(ddf.head())
print("=======================================================")
print(ddf.dtypes)
print("=======================================================")
pyspark_df = spark.createDataFrame(ddf,schema) 
pyspark_df.show(10)

  lon    lat      time0  eastward_wind_at_100_metres  \
0  0.0  90.00 2021-03-01                       -0.375   
1  0.0  89.75 2021-03-01                       -7.000   
2  0.0  89.50 2021-03-01                       -7.125   
3  0.0  89.25 2021-03-01                       -7.250   
4  0.0  89.00 2021-03-01                       -7.250   

   eastward_wind_at_10_metres  northward_wind_at_100_metres  \
0                     -0.1875                       -0.1250   
1                     -5.0625                        0.7500   
2                     -5.1875                        0.8125   
3                     -5.3750                        1.0625   
4                     -6.0000                        1.5625   

   northward_wind_at_10_metres  
0                       0.0000  
1                       0.0625  
2                       0.1250  
3                       0.3750  
4                       1.1250  
=======================================================
lon                                    float32
lat                                    float32
time0                           datetime64[ns]
eastward_wind_at_100_metres            float32
eastward_wind_at_10_metres             float32
northward_wind_at_100_metres           float32
northward_wind_at_10_metres            float32
dtype: object
=======================================================
...
...
~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify_struct(obj)
   1394                     verifier(d.get(f))
   1395             else:
-> 1396                 raise TypeError(new_msg("StructType can not accept object %r in type %s"
   1397                                         % (obj, type(obj))))
   1398         verify_value = verify_struct

TypeError: StructType can not accept object 'lon' in type <class 'str'>

完整追溯

TypeError                                 Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_5368/3996125343.py in <module>
     15 print(ddf.dtypes)
     16 print("=======================================================")
---> 17 pyspark_df = spark.createDataFrame(ddf,schema)
     18 pyspark_df.show(10)

~\Anaconda3\lib\site-packages\pyspark\sql\session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    673             return super(SparkSession, self).createDataFrame(
    674                 data, schema, samplingRatio, verifySchema)
--> 675         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    676 
    677     def _create_dataframe(self, data, schema, samplingRatio, verifySchema):

~\Anaconda3\lib\site-packages\pyspark\sql\session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
    698             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    699         else:
--> 700             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    701         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    702         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

~\Anaconda3\lib\site-packages\pyspark\sql\session.py in _createFromLocal(self, data, schema)
    507         # make sure data could consumed multiple times
    508         if not isinstance(data, list):
--> 509             data = list(data)
    510 
    511         if schema is None or isinstance(schema, (list, tuple)):

~\Anaconda3\lib\site-packages\pyspark\sql\session.py in prepare(obj)
    680 
    681             def prepare(obj):
--> 682                 verify_func(obj)
    683                 return obj
    684         elif isinstance(schema, DataType):

~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify(obj)
   1407     def verify(obj):
   1408         if not verify_nullability(obj):
-> 1409             verify_value(obj)
   1410 
   1411     return verify

~\Anaconda3\lib\site-packages\pyspark\sql\types.py in verify_struct(obj)
   1394                     verifier(d.get(f))
   1395             else:
-> 1396                 raise TypeError(new_msg("StructType can not accept object %r in type %s"
   1397                                         % (obj, type(obj))))
   1398         verify_value = verify_struct

TypeError: StructType can not accept object 'lon' in type <class 'str'>

旁白:您确定要用 spark 处理这些数据吗? Dask 与 xarray 集成得很好,您可能会发现转换不值得,尤其是因为您需要复制和转换数据并有两个集群系统 运行.

简短回答:createDataFrame 完全支持 pandas 而不是“pandas-like 对象”,例如 dask 数据帧。来自文档字符串:

    data : :class:`RDD` or iterable
        an RDD of any kind of SQL data representation (:class:`Row`,
        :class:`tuple`, ``int``, ``boolean``, etc.), or :class:`list`, or
        :class:`pandas.DataFrame`.

一个 dask 数据帧由惰性分区组成,但你要求 spark 将它们发送给 spark workers。这不是一个简单的操作!您 可以 决定为每个 dask 分区创建 spark 数据帧,并在工作端连接它们 - 这可能会实现您所追求的目标,但效率不高。

to_spark_df = dask.delayed(spark.createDataFrame)
pieces = dask.compute(*[to_spark_df(d) for d in ddf.to_delayed()])
spark_df = functoos.reduce(pyspark.sql.DataFrame.unionAll, pieces)

(注意这是在链中应用union,spark中好像没有简单的concat操作)