PySpark:如何将带逗号的列指定为十进制

PySpark: How to specify column with comma as decimal

我正在使用 PySpark 并加载一个 csv 文件。我有一列欧洲格式的数字,这意味着逗号代替了点,反之亦然。

例如:我有 2.416,67 而不是 2,416.67

My data in .csv file looks like this -    
ID;    Revenue
21;    2.645,45
23;   31.147,05
.
.
55;    1.009,11

在pandas中,通过在pd.read_csv()中指定decimal=','thousands='.'选项来读取欧洲格式,可以轻松读取此类文件。

Pandas代码:

import pandas as pd
df=pd.read_csv("filepath/revenues.csv",sep=';',decimal=',',thousands='.')

我不知道如何在 PySpark 中完成此操作。

PySpark 代码:

from pyspark.sql.types import StructType, StructField, FloatType, StringType
schema = StructType([
            StructField("ID", StringType(), True),
            StructField("Revenue", FloatType(), True)
                    ])
df=spark.read.csv("filepath/revenues.csv",sep=';',encoding='UTF-8', schema=schema, header=True)

谁能建议我们如何使用上述 .csv() 函数在 PySpark 中加载这样的文件?

由于数据的格式,您将无法将其读取为浮点数。您需要将其作为字符串读取,清理并转换为浮点数:

from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import FloatType

df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = df.withColumn('revenue', regexp_replace('revenue', '\.', ''))
df = df.withColumn('revenue', regexp_replace('revenue', ',', '.'))
df = df.withColumn('revenue', df['revenue'].cast("float"))

您也可以将这些链接在一起:

df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = (
         df
         .withColumn('revenue', regexp_replace('revenue', '\.', ''))
         .withColumn('revenue', regexp_replace('revenue', ',', '.'))
         .withColumn('revenue', df['revenue'].cast("float"))
     )

请注意,我尚未对此进行测试,因此其中可能存在一两个错字。

确保您的 SQL table 已预先格式化为读取 NUMERIC 而不是 INTEGER。 我在尝试弄清楚有关编码以及点和逗号等的不同格式,最后问题要原始得多,它被预先格式化为只读整数,因此无论是逗号还是点,都不会接受小数.然后我只需要更改我的 SQL table 以接受实数 (NUMERIC),就是这样。

如果你的数据集有很多浮动列,但数据集的大小仍然足够小,可以先用pandas预处理它,我发现它更容易只需执行以下操作。

import pandas as pd

df_pandas = pd.read_csv('yourfile.csv', sep=';', decimal=',')
df_pandas.to_csv('yourfile__dot_as_decimal_separator.csv', sep=';', decimal='.') # optionally also header=True of course.

df_spark = spark.csv.read('yourfile__dot_as_decimal_separator.csv', sep=';', inferSchema=True) # optionally also header=True of course.

我确实发现 jhole89 的答案非常有用,但发现将它应用于具有很多列(数百个)的数据集很痛苦。

我的意思是:

  • 手动指定浮点列并转换它们很费力,
  • 尝试通过检查哪些列是字符串类型并包含逗号来动态查找它们,避免不考虑带有毫秒分隔符的日期时间列等,强制转换为在某些列上失败的浮点数,因为它们是包含逗号但不打算解析为浮点数的文本:这会让人头疼。

因此,如果有多个浮点列你的数据集可以用pandas预处理,你可以应用以上代码。