PySpark:如何将带逗号的列指定为十进制
PySpark: How to specify column with comma as decimal
我正在使用 PySpark 并加载一个 csv
文件。我有一列欧洲格式的数字,这意味着逗号代替了点,反之亦然。
例如:我有 2.416,67
而不是 2,416.67
。
My data in .csv file looks like this -
ID; Revenue
21; 2.645,45
23; 31.147,05
.
.
55; 1.009,11
在pandas中,通过在pd.read_csv()
中指定decimal=','
和thousands='.'
选项来读取欧洲格式,可以轻松读取此类文件。
Pandas代码:
import pandas as pd
df=pd.read_csv("filepath/revenues.csv",sep=';',decimal=',',thousands='.')
我不知道如何在 PySpark 中完成此操作。
PySpark 代码:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
schema = StructType([
StructField("ID", StringType(), True),
StructField("Revenue", FloatType(), True)
])
df=spark.read.csv("filepath/revenues.csv",sep=';',encoding='UTF-8', schema=schema, header=True)
谁能建议我们如何使用上述 .csv()
函数在 PySpark 中加载这样的文件?
由于数据的格式,您将无法将其读取为浮点数。您需要将其作为字符串读取,清理并转换为浮点数:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import FloatType
df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = df.withColumn('revenue', regexp_replace('revenue', '\.', ''))
df = df.withColumn('revenue', regexp_replace('revenue', ',', '.'))
df = df.withColumn('revenue', df['revenue'].cast("float"))
您也可以将这些链接在一起:
df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = (
df
.withColumn('revenue', regexp_replace('revenue', '\.', ''))
.withColumn('revenue', regexp_replace('revenue', ',', '.'))
.withColumn('revenue', df['revenue'].cast("float"))
)
请注意,我尚未对此进行测试,因此其中可能存在一两个错字。
确保您的 SQL table 已预先格式化为读取 NUMERIC 而不是 INTEGER。 我在尝试弄清楚有关编码以及点和逗号等的不同格式,最后问题要原始得多,它被预先格式化为只读整数,因此无论是逗号还是点,都不会接受小数.然后我只需要更改我的 SQL table 以接受实数 (NUMERIC),就是这样。
如果你的数据集有很多浮动列,但数据集的大小仍然足够小,可以先用pandas预处理它,我发现它更容易只需执行以下操作。
import pandas as pd
df_pandas = pd.read_csv('yourfile.csv', sep=';', decimal=',')
df_pandas.to_csv('yourfile__dot_as_decimal_separator.csv', sep=';', decimal='.') # optionally also header=True of course.
df_spark = spark.csv.read('yourfile__dot_as_decimal_separator.csv', sep=';', inferSchema=True) # optionally also header=True of course.
我确实发现 jhole89 的答案非常有用,但发现将它应用于具有很多列(数百个)的数据集很痛苦。
我的意思是:
- 手动指定浮点列并转换它们很费力,
- 尝试通过检查哪些列是字符串类型并包含逗号来动态查找它们,避免不考虑带有毫秒分隔符的日期时间列等,强制转换为在某些列上失败的浮点数,因为它们是包含逗号但不打算解析为浮点数的文本:这会让人头疼。
因此,如果有多个浮点列和你的数据集可以用pandas预处理,你可以应用以上代码。
我正在使用 PySpark 并加载一个 csv
文件。我有一列欧洲格式的数字,这意味着逗号代替了点,反之亦然。
例如:我有 2.416,67
而不是 2,416.67
。
My data in .csv file looks like this -
ID; Revenue
21; 2.645,45
23; 31.147,05
.
.
55; 1.009,11
在pandas中,通过在pd.read_csv()
中指定decimal=','
和thousands='.'
选项来读取欧洲格式,可以轻松读取此类文件。
Pandas代码:
import pandas as pd
df=pd.read_csv("filepath/revenues.csv",sep=';',decimal=',',thousands='.')
我不知道如何在 PySpark 中完成此操作。
PySpark 代码:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
schema = StructType([
StructField("ID", StringType(), True),
StructField("Revenue", FloatType(), True)
])
df=spark.read.csv("filepath/revenues.csv",sep=';',encoding='UTF-8', schema=schema, header=True)
谁能建议我们如何使用上述 .csv()
函数在 PySpark 中加载这样的文件?
由于数据的格式,您将无法将其读取为浮点数。您需要将其作为字符串读取,清理并转换为浮点数:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import FloatType
df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = df.withColumn('revenue', regexp_replace('revenue', '\.', ''))
df = df.withColumn('revenue', regexp_replace('revenue', ',', '.'))
df = df.withColumn('revenue', df['revenue'].cast("float"))
您也可以将这些链接在一起:
df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = (
df
.withColumn('revenue', regexp_replace('revenue', '\.', ''))
.withColumn('revenue', regexp_replace('revenue', ',', '.'))
.withColumn('revenue', df['revenue'].cast("float"))
)
请注意,我尚未对此进行测试,因此其中可能存在一两个错字。
确保您的 SQL table 已预先格式化为读取 NUMERIC 而不是 INTEGER。 我在尝试弄清楚有关编码以及点和逗号等的不同格式,最后问题要原始得多,它被预先格式化为只读整数,因此无论是逗号还是点,都不会接受小数.然后我只需要更改我的 SQL table 以接受实数 (NUMERIC),就是这样。
如果你的数据集有很多浮动列,但数据集的大小仍然足够小,可以先用pandas预处理它,我发现它更容易只需执行以下操作。
import pandas as pd
df_pandas = pd.read_csv('yourfile.csv', sep=';', decimal=',')
df_pandas.to_csv('yourfile__dot_as_decimal_separator.csv', sep=';', decimal='.') # optionally also header=True of course.
df_spark = spark.csv.read('yourfile__dot_as_decimal_separator.csv', sep=';', inferSchema=True) # optionally also header=True of course.
我确实发现 jhole89 的答案非常有用,但发现将它应用于具有很多列(数百个)的数据集很痛苦。
我的意思是:
- 手动指定浮点列并转换它们很费力,
- 尝试通过检查哪些列是字符串类型并包含逗号来动态查找它们,避免不考虑带有毫秒分隔符的日期时间列等,强制转换为在某些列上失败的浮点数,因为它们是包含逗号但不打算解析为浮点数的文本:这会让人头疼。
因此,如果有多个浮点列和你的数据集可以用pandas预处理,你可以应用以上代码。