PySpark 类型错误
PySpark TypeErrors
正在编写简单的 CSV 到 Parquet 转换。
CSV 文件中有几个时间戳。因此,当我尝试编写时出现类型错误。
为了解决这个问题,我尝试实现这一行来识别时间戳列并对它们执行 to_timestamp。
rdd = sc.textFile("../../../Downloads/test_type.csv").map(lambda line: [to_timestamp(i) if instr(i,"-")==5 else i for i in line.split(",")])
出现此错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:/yy/xx/Documents/gg/csv_to_parquet/csv_to_parquet.py", line 55, in <lambda>
rdd = sc.textFile("../../../test/test.csv").map(lambda line: [to_timestamp(i) if (instr(i,"-")==5) else i for i in line.split(",")])
AttributeError: 'NoneType' object has no attribute '_jvm'
知道如何实现吗?
==========================================
版本 2
今天取得了一些进展,我现在正在编写 parquet 文件,但是当我查询数据时,我得到一个 Binary data vs timestamp data error:
HIVE_BAD_DATA: Field header__timestamp's type BINARY in parquet is incompatible with type timestamp defined in table schema
我修改了代码以最初使用所有 StringType,然后修改了数据框中的数据类型。
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
schema = StructType\
([
StructField("header__change_seq", StringType(), True),
StructField("header__change_oper", StringType(), True),
StructField("header__change_mask", StringType(), True),
StructField("header__stream_position", StringType(), True),
StructField("header__operation", StringType(), True),
StructField("header__transaction_id", StringType(), True),
StructField("header__timestamp", StringType(), True),
StructField("l_en_us", StringType(), True),
StructField("priority", StringType(), True),
StructField("typecode", StringType(), True),
StructField("retired", StringType(), True),
StructField("name", StringType(), True),
StructField("id", StringType(), True),
StructField("description", StringType(), True),
StructField("l_es_ar", StringType(), True),
StructField("adw_updated_ts", StringType(), True),
StructField("adw_process_id", StringType(), True)
])
rdd = sc.textFile("../../../Downloads/pctl_jobdatetype.csv").map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df2 = df.withColumn('header__timestamp', df['header__timestamp'].cast('timestamp'))
df2 = df.withColumn('adw_updated_ts', df['adw_updated_ts'].cast('timestamp'))
df2 = df.withColumn('priority', df['priority'].cast('double'))
df2 = df.withColumn('id', df['id'].cast('double'))
df2.write.parquet('../../../Downloads/input-parquet')
示例数据:
"header__change_seq","header__change_oper","header__change_mask","header__stream_position","header__operation","header__transaction_id","header__timestamp","l_en_us","priority","typecode","retired","name","id","description","l_es_ar","adw_updated_ts","adw_process_id"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Effective Date","10.0","Effective","0","Effective Date","10001.0","Effective Date","Effective Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Written Date","20.0","Written","0","Written Date","10002.0","Written Date","Written Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Reference Date","30.0","Reference","0","Reference Date","10003.0","Reference Date","Reference Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
在我将下面第 3-6 行的数据框名称修改为 df2 之后,似乎工作正常,Athena 也返回了结果。
df = sqlContext.createDataFrame(rdd, schema)
df2 = df.withColumn('header__timestamp', df['header__timestamp'].cast('timestamp'))
df2 = df2.withColumn('adw_updated_ts', df['adw_updated_ts'].cast('timestamp'))
df2 = df2.withColumn('priority', df['priority'].cast('double'))
df2 = df2.withColumn('id', df['id'].cast('double'))
df2.write.parquet('../../../Downloads/input-parquet')
正在编写简单的 CSV 到 Parquet 转换。
CSV 文件中有几个时间戳。因此,当我尝试编写时出现类型错误。
为了解决这个问题,我尝试实现这一行来识别时间戳列并对它们执行 to_timestamp。
rdd = sc.textFile("../../../Downloads/test_type.csv").map(lambda line: [to_timestamp(i) if instr(i,"-")==5 else i for i in line.split(",")])
出现此错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:/yy/xx/Documents/gg/csv_to_parquet/csv_to_parquet.py", line 55, in <lambda>
rdd = sc.textFile("../../../test/test.csv").map(lambda line: [to_timestamp(i) if (instr(i,"-")==5) else i for i in line.split(",")])
AttributeError: 'NoneType' object has no attribute '_jvm'
知道如何实现吗?
==========================================
版本 2
今天取得了一些进展,我现在正在编写 parquet 文件,但是当我查询数据时,我得到一个 Binary data vs timestamp data error:
HIVE_BAD_DATA: Field header__timestamp's type BINARY in parquet is incompatible with type timestamp defined in table schema
我修改了代码以最初使用所有 StringType,然后修改了数据框中的数据类型。
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
schema = StructType\
([
StructField("header__change_seq", StringType(), True),
StructField("header__change_oper", StringType(), True),
StructField("header__change_mask", StringType(), True),
StructField("header__stream_position", StringType(), True),
StructField("header__operation", StringType(), True),
StructField("header__transaction_id", StringType(), True),
StructField("header__timestamp", StringType(), True),
StructField("l_en_us", StringType(), True),
StructField("priority", StringType(), True),
StructField("typecode", StringType(), True),
StructField("retired", StringType(), True),
StructField("name", StringType(), True),
StructField("id", StringType(), True),
StructField("description", StringType(), True),
StructField("l_es_ar", StringType(), True),
StructField("adw_updated_ts", StringType(), True),
StructField("adw_process_id", StringType(), True)
])
rdd = sc.textFile("../../../Downloads/pctl_jobdatetype.csv").map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df2 = df.withColumn('header__timestamp', df['header__timestamp'].cast('timestamp'))
df2 = df.withColumn('adw_updated_ts', df['adw_updated_ts'].cast('timestamp'))
df2 = df.withColumn('priority', df['priority'].cast('double'))
df2 = df.withColumn('id', df['id'].cast('double'))
df2.write.parquet('../../../Downloads/input-parquet')
示例数据:
"header__change_seq","header__change_oper","header__change_mask","header__stream_position","header__operation","header__transaction_id","header__timestamp","l_en_us","priority","typecode","retired","name","id","description","l_es_ar","adw_updated_ts","adw_process_id"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Effective Date","10.0","Effective","0","Effective Date","10001.0","Effective Date","Effective Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Written Date","20.0","Written","0","Written Date","10002.0","Written Date","Written Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Reference Date","30.0","Reference","0","Reference Date","10003.0","Reference Date","Reference Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
在我将下面第 3-6 行的数据框名称修改为 df2 之后,似乎工作正常,Athena 也返回了结果。
df = sqlContext.createDataFrame(rdd, schema)
df2 = df.withColumn('header__timestamp', df['header__timestamp'].cast('timestamp'))
df2 = df2.withColumn('adw_updated_ts', df['adw_updated_ts'].cast('timestamp'))
df2 = df2.withColumn('priority', df['priority'].cast('double'))
df2 = df2.withColumn('id', df['id'].cast('double'))
df2.write.parquet('../../../Downloads/input-parquet')