PySpark DF 无法执行任何操作

PySpark DF Cannot do any action

这个问题已经出现 3 周了。 长话短说,我的目标是创建一个用于推荐的用户-项目矩阵(余弦、悲伤、...)。

为此,我创建了以下代码。第一个函数列出了我 HDFS 中的所有文件,这样我就可以同时读取我的所有订单、我的产品视图和我添加到购物车的产品。

这些操作工作正常,我可以毫无问题地打印我的数据帧。当我开始对我的 3 个 DF 的联合执行 .distinct 之类的操作时,我无法在我的 df 上执行任何操作,我可以显示、收集、toPandas 而不会出现我无法理解的大错误。

读取 parquet 时出错,但读取后我可以正确读取我的 df,因此读取正确没有问题?

我已经尝试过,写入遗留格式,以及很多其他东西。

你们中的一些人可以帮助我吗? (如有任何问题,请随时提出,或要求更多codes/files)

谢谢! 阿德里安

def get_data_raw_to_df(spark, nb_months=36, id_shop="-1"):
    '''get Data'''

    filename_list_paid = get_date_list(
        spark,
        "hdfs:///" + id_shop + "/master/order",
        id_shop,
        nb_months,
        "paid_"
        )

    filename_list_cart = get_date_list(
        spark,
        "hdfs:///" + id_shop + "/master/product",
        id_shop,
        nb_months,
        "cart_"
        )

    filename_list_view = get_date_list(
        spark,
        "hdfs:///" + id_shop + "/master/product",
        id_shop,
        nb_months,
        "view_"
        )

    empty_rdd = spark.sparkContext.emptyRDD()

    schema = StructType([
        StructField('cid', StringType(), True),
        StructField('sid', StringType(), True),
        StructField('product_id', StringType(), True),
        StructField('buy_product', IntegerType(), True),
        StructField('cart_product', IntegerType(), True),
        StructField('view_product', IntegerType(), True),
    ])

    df_users_items = empty_rdd.toDF(schema)

    if len(filename_list_paid) > 0:
        df_tmp_buy_product = spark.read.parquet(*filename_list_paid)\
                                .filter("is_bot == 'false'")\
                                .select('cid',
                                        'sid',
                                        F.explode('items'))\
                                .select('cid', 'sid',
                                        col("col.product_id"))\
                                .withColumn("buy_product",
                                            F.lit(5))
    if len(filename_list_cart) > 0:
        df_tmp_cart_product = spark.read.parquet(*filename_list_cart)\
                                .filter("is_bot == 'false'")\
                                .select('cid', 'sid',
                                        'product_id')\
                                .withColumn("cart_product",
                                            F.lit(2))
    if len(filename_list_view) > 0:
        df_tmp_view_product = spark.read.parquet(*filename_list_view)\
                                .filter("is_bot == 'false'")\
                                .select('cid', 'sid',
                                        'product_id')\
                                .withColumn("view_product",
                                            F.lit(1))

    try:
        # Setup product buy
        df_users_items = df_users_items\
            .unionByName(df_tmp_buy_product
                         .select('cid',
                                 'product_id',
                                 'sid',
                                 'buy_product'),
                         allowMissingColumns=True)
        # Can do thing form here
        df_tmp = (df_tmp_buy_product.select('cid', 'sid')
                  .unionByName(
                        df_tmp_cart_product.select('cid', 'sid'))
                  .unionByName(
                        df_tmp_view_product.select('cid', 'sid'))).distinct()
# Errors from here
...

错误日志:https://justpaste.it/2y7jc

spark.read.option('mergeSchema', True).parquet(*filename_list_paid) 可能会有帮助。或者您可以尝试阅读此特定文件。

spark.read.parquet('.../order/paid__20220324.parquet/part-00039-ff535f97*').printSchema()看看你有没有String或者其他类型