PySpark DF 无法执行任何操作
PySpark DF Cannot do any action
这个问题已经出现 3 周了。
长话短说,我的目标是创建一个用于推荐的用户-项目矩阵(余弦、悲伤、...)。
为此,我创建了以下代码。第一个函数列出了我 HDFS 中的所有文件,这样我就可以同时读取我的所有订单、我的产品视图和我添加到购物车的产品。
这些操作工作正常,我可以毫无问题地打印我的数据帧。当我开始对我的 3 个 DF 的联合执行 .distinct 之类的操作时,我无法在我的 df 上执行任何操作,我可以显示、收集、toPandas 而不会出现我无法理解的大错误。
读取 parquet 时出错,但读取后我可以正确读取我的 df,因此读取正确没有问题?
我已经尝试过,写入遗留格式,以及很多其他东西。
你们中的一些人可以帮助我吗? (如有任何问题,请随时提出,或要求更多codes/files)
谢谢!
阿德里安
def get_data_raw_to_df(spark, nb_months=36, id_shop="-1"):
'''get Data'''
filename_list_paid = get_date_list(
spark,
"hdfs:///" + id_shop + "/master/order",
id_shop,
nb_months,
"paid_"
)
filename_list_cart = get_date_list(
spark,
"hdfs:///" + id_shop + "/master/product",
id_shop,
nb_months,
"cart_"
)
filename_list_view = get_date_list(
spark,
"hdfs:///" + id_shop + "/master/product",
id_shop,
nb_months,
"view_"
)
empty_rdd = spark.sparkContext.emptyRDD()
schema = StructType([
StructField('cid', StringType(), True),
StructField('sid', StringType(), True),
StructField('product_id', StringType(), True),
StructField('buy_product', IntegerType(), True),
StructField('cart_product', IntegerType(), True),
StructField('view_product', IntegerType(), True),
])
df_users_items = empty_rdd.toDF(schema)
if len(filename_list_paid) > 0:
df_tmp_buy_product = spark.read.parquet(*filename_list_paid)\
.filter("is_bot == 'false'")\
.select('cid',
'sid',
F.explode('items'))\
.select('cid', 'sid',
col("col.product_id"))\
.withColumn("buy_product",
F.lit(5))
if len(filename_list_cart) > 0:
df_tmp_cart_product = spark.read.parquet(*filename_list_cart)\
.filter("is_bot == 'false'")\
.select('cid', 'sid',
'product_id')\
.withColumn("cart_product",
F.lit(2))
if len(filename_list_view) > 0:
df_tmp_view_product = spark.read.parquet(*filename_list_view)\
.filter("is_bot == 'false'")\
.select('cid', 'sid',
'product_id')\
.withColumn("view_product",
F.lit(1))
try:
# Setup product buy
df_users_items = df_users_items\
.unionByName(df_tmp_buy_product
.select('cid',
'product_id',
'sid',
'buy_product'),
allowMissingColumns=True)
# Can do thing form here
df_tmp = (df_tmp_buy_product.select('cid', 'sid')
.unionByName(
df_tmp_cart_product.select('cid', 'sid'))
.unionByName(
df_tmp_view_product.select('cid', 'sid'))).distinct()
# Errors from here
...
spark.read.option('mergeSchema', True).parquet(*filename_list_paid)
可能会有帮助。或者您可以尝试阅读此特定文件。
用spark.read.parquet('.../order/paid__20220324.parquet/part-00039-ff535f97*').printSchema()
看看你有没有String
或者其他类型
这个问题已经出现 3 周了。 长话短说,我的目标是创建一个用于推荐的用户-项目矩阵(余弦、悲伤、...)。
为此,我创建了以下代码。第一个函数列出了我 HDFS 中的所有文件,这样我就可以同时读取我的所有订单、我的产品视图和我添加到购物车的产品。
这些操作工作正常,我可以毫无问题地打印我的数据帧。当我开始对我的 3 个 DF 的联合执行 .distinct 之类的操作时,我无法在我的 df 上执行任何操作,我可以显示、收集、toPandas 而不会出现我无法理解的大错误。
读取 parquet 时出错,但读取后我可以正确读取我的 df,因此读取正确没有问题?
我已经尝试过,写入遗留格式,以及很多其他东西。
你们中的一些人可以帮助我吗? (如有任何问题,请随时提出,或要求更多codes/files)
谢谢! 阿德里安
def get_data_raw_to_df(spark, nb_months=36, id_shop="-1"):
'''get Data'''
filename_list_paid = get_date_list(
spark,
"hdfs:///" + id_shop + "/master/order",
id_shop,
nb_months,
"paid_"
)
filename_list_cart = get_date_list(
spark,
"hdfs:///" + id_shop + "/master/product",
id_shop,
nb_months,
"cart_"
)
filename_list_view = get_date_list(
spark,
"hdfs:///" + id_shop + "/master/product",
id_shop,
nb_months,
"view_"
)
empty_rdd = spark.sparkContext.emptyRDD()
schema = StructType([
StructField('cid', StringType(), True),
StructField('sid', StringType(), True),
StructField('product_id', StringType(), True),
StructField('buy_product', IntegerType(), True),
StructField('cart_product', IntegerType(), True),
StructField('view_product', IntegerType(), True),
])
df_users_items = empty_rdd.toDF(schema)
if len(filename_list_paid) > 0:
df_tmp_buy_product = spark.read.parquet(*filename_list_paid)\
.filter("is_bot == 'false'")\
.select('cid',
'sid',
F.explode('items'))\
.select('cid', 'sid',
col("col.product_id"))\
.withColumn("buy_product",
F.lit(5))
if len(filename_list_cart) > 0:
df_tmp_cart_product = spark.read.parquet(*filename_list_cart)\
.filter("is_bot == 'false'")\
.select('cid', 'sid',
'product_id')\
.withColumn("cart_product",
F.lit(2))
if len(filename_list_view) > 0:
df_tmp_view_product = spark.read.parquet(*filename_list_view)\
.filter("is_bot == 'false'")\
.select('cid', 'sid',
'product_id')\
.withColumn("view_product",
F.lit(1))
try:
# Setup product buy
df_users_items = df_users_items\
.unionByName(df_tmp_buy_product
.select('cid',
'product_id',
'sid',
'buy_product'),
allowMissingColumns=True)
# Can do thing form here
df_tmp = (df_tmp_buy_product.select('cid', 'sid')
.unionByName(
df_tmp_cart_product.select('cid', 'sid'))
.unionByName(
df_tmp_view_product.select('cid', 'sid'))).distinct()
# Errors from here
...
spark.read.option('mergeSchema', True).parquet(*filename_list_paid)
可能会有帮助。或者您可以尝试阅读此特定文件。
用spark.read.parquet('.../order/paid__20220324.parquet/part-00039-ff535f97*').printSchema()
看看你有没有String
或者其他类型