pyspark 内部连接无法解析明显具有的属性

pyspark inner join unable to resolve attributes which clearly have

我有两个 pyspark 数据帧 tsteval 和 top_rec。我正在尝试创建一个新的数据框 top_rec_tckts,方法是仅针对具有 storeid 和 tz_brand_id 与 top_rec 相同的记录过滤 tsteval。所以我可以从 tsteval 取回这些记录的 storeid 和 ticketid。我有下面两个数据帧的示例输出。它们都有 storeid 和 tz_brand_id 字段。我不明白为什么当我尝试使用内部连接过滤 tsteval 时会出现以下错误。有没有人看到问题是什么,或者你能建议另一种方法来完成这个。很抱歉,我不得不删除下面的一堆错误消息以使其适合。我留下了开头和结尾,我希望这些足够的线索可以看出发生了什么。

tsteval.show(truncate=False)

print('')
top_rec.show(truncate=False)

示例数据:

+----------+----------+
|tz_brand_id|storeid|qty|dateclosed|grossreceipts      |ticketid                            |current_date|filter_date|min_dt    |max_dt    |
+-----------+-------+---+----------+-------------------+------------------------------------+------------+-----------+----------+----------+
|2847       |87     |1.0|2020-06-15|21.1453375         |02c8ec06-a75a-4dd2-89e2-dbbf1dxxxxxx|2020-07-15  |2020-03-17 |2020-03-17|2020-05-16|
|2847       |87     |1.0|2020-05-23|21.1453375         |67a34306-6608-4b00-bf72-f1f42xxxxxx|2020-07-15  |2020-03-17 |2020-03-17|2020-05-16|
|2847       |87     |1.0|2020-05-19|26.129683025000002 |82665853-66ad-4e52-851e-f1cdf8xxxxxx|2020-07-15  |2020-03-17 |2020-03-17|2020-05-16|
|3285       |127    |1.0|2020-06-02|20.642125          |d0898233-64b3-48d8-9a46-a03eefxxxxxx|2020-07-15  |2020-03-17 |2020-03-17|2020-05-16|
|3285       |127    |1.0|2020-05-22|20.642125          |941d2889-230f-4a19-9cb9-90f7b2xxxxxx|2020-07-15  |2020-03-17 |2020-03-17|2020-05-16|
|2747       |77     |1.0|2020-05-30|21.3902            |72c3c7dd-a436-45ae-9adb-f19618xxxxxx|2020-07-15  |2020-03-17 |2020-03-17|2020-05-16|
|9601       |85     |1.0|2020-05-30|23.0               |74328e66-6371-4323-bdf9-057d2xxxxxx|2020-07-15  |2020-03-17 |2020-03-17|2020-05-16|
|9601       |85     |1.0|2020-05-29|20.7               |997ab6b3-b4b5-48e4-884d-00834xxxxxx|2020-07-15  |2020-03-17 |2020-03-17|2020-05-16|
+-----------+-------+---+----------+-------------------+------------------------------------+------------+-----------+----------+----------+
only showing top 20 rows


+-------+----------+-----------+
|storeid|max_dt    |tz_brand_id|
+-------+----------+-----------+
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
|127    |2020-05-16|2799       |
+-------+----------+-----------+

代码:

top_rec_tckts=tsteval.alias('a')\
.join(top_rec.alias('b'),
     (col('a.storeid')==col('b.storeid'))\
      &(col('a.tz_brand_id')==col('b.tz_brand_id')),
      how='inner'
     )\
.select(col('a.storeid'),
       col('a.ticketid'))

错误:

 An error was encountered:
    'Resolved attribute(s) max_dt#6786 missing from storeid#3445,qty#3375,max_dt#3299,min_dt#3289,grossreceipts#3381,filter_date#411,tz_brand_id#3449,ticketid#3387,dateclosed#3390,current_date#403 in operator !Filter ((dateclosed#3390 > min_dt#3289) && (dateclosed#3390 <= max_dt#6786)). Attribute(s) with the same name appear in the operation: max_dt. Please check if the right attribute(s) are used.;;\nJoin Inner, ((storeid#292 = storeid#7081) && (tz_brand_id#296 = tz_brand_id#4560))\n:- SubqueryAlias `a`\n:  +- Filter ((dateclosed#237 > max_dt#3299) && (dateclosed#237 <= date_add(max_dt#3299, 30)))\n:     +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#3299]\n:        +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n:           +- Filter (dateclosed#237 > filter_date#411)\n:              +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n:                 +- Filter storeid#292 IN (85,130,77,127,87)\n:                    +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n:                       +- Filter isnotnull(tz_brand_id#296)\n:                          +- Filter NOT (storeid#292 = 230)\n:                             +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234]\n:                                +- Filter (producttype#211 = EDIBLE)\n:                                   +- LogicalRDD [cbd_perc#199, thc_perc#200, register#201, customer_type#202, type#203, customer_state#204, customer_city#205, zip_code#206, age#207, age_group#208, cashier#209, approver#210, producttype#211, productsubtype#212, productattributes#213, productbrand#214, productname#215, classification#216, tier#217, weight#218, unitofmeasure#219, size#220, priceunit#221, qty#222, ... 75 more fields], false\n+- SubqueryAlias `b`\n   +- Project [storeid#7081, max_dt#6786, tz_brand_id#4560]\n      +- Project [storeid#7081, max_dt#6786, tz_brand_id#7085, prediction#4548, tz_brand_id#4560, tz_brand_id#4560]\n         +- Window [first(tz_brand_id#7085, true) windowspecdefinition(storeid#7081, max_dt#6786, prediction#4548 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS tz_brand_id#4560], [storeid#7081, max_dt#6786], [prediction#4548 DESC NULLS LAST]\n            +- Project [storeid#7081, max_dt#6786, tz_brand_id#7085, prediction#4548]\n               +- Filter AtLeastNNulls(n, prediction#4548)\n                  +- Project [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366, UDF(features#4493, features#4503) AS prediction#4548]\n                     +- Join LeftOuter, (UDF(tz_brand_id#7085) = id#4502)\n                        :- Join LeftOuter, (UDF(storeid#7081) = id#4492)\n                        :  :- Deduplicate [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366]\n                        :  :  +- Filter ((cast(max_dt#6786 as timestamp) < accepted_date#4366) || isnull(accepted_date#4366))\n                        :  :     +- Project [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366]\n                        :  :        +- Join LeftOuter, ((storeid#7081 = storeid#4102) && (tz_brand_id#7085 = tz_brand_id#4103))\n                        :  :           :- SubqueryAlias `a`\n                        :  :           :  +- Join Cross\n                        :  :           :     :- Project [storeid#7081, max_dt#6786]\n                        :  :           :     :  +- Project [storeid#7081, max_dt#6786]\n                        :  :           :     :     +- Project [tz_brand_id#7085, min_dt#3289, max_dt#6786, coalesce((brand_qty#3346 / total_qty#3326), cast(0 as double)) AS norm_qty#3472, storeid#7081]\n                        :  :           :     :        +- Join LeftOuter, (storeid#7081 = storeid#3445)\n                        :  :           :     :           :- SubqueryAlias `a`\n                        :  :           :     :           :  +- Project [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty)#3339 AS brand_qty#3346]\n                        :  :           :     :           :     +- Aggregate [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085], [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty#7011) AS sum(qty)#3339]\n                        :  :           :     :           :        +- Filter ((dateclosed#7026 > min_dt#3289) && (dateclosed#7026 <= max_dt#6786))\n                        :  :           :     :           :           +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#6786]\n                        :  :           :     :           :              +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n                        :  :           :     :           :                 +- Filter (dateclosed#7026 > filter_date#411)\n                        :  :           :     :           :                    +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n                        :  :           :     :           :                       +- Filter storeid#7081 IN (85,130,77,127,87)\n                        :  :           :     :           :                          +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n                        :  :           :     :           :                             +- Filter isnotnull(tz_brand_id#7085)\n                        :  :           :     :           :                                +- Filter NOT (storeid#7081 = 230)\n                        :  :           :     :           :                                   +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023]\n                        :  :           :     :           :                                      +- Filter (producttype#7000 = EDIBLE)\n                        :  :           :     :           :                                         +- LogicalRDD [cbd_perc#6988, thc_perc#6989, register#6990, customer_type#6991, type#6992, customer_state#6993, customer_city#6994, zip_code#6995, age#6996, age_group#6997, cashier#6998, approver#6999, producttype#7000, productsubtype#7001, productattributes#7002, productbrand#7003, productname#7004, classification#7005, tier#7006, weight#7007, unitofmeasure#7008, size#7009, priceunit#7010, qty#7011, ... 75 more fields], false\n                        :  :           :     :           +- SubqueryAlias `b`\n                        :  :           :     :              +- Project [storeid#3445, sum(qty)#3320 AS total_qty#3326]\n                        :  :           :     :                 +- !Aggregate [storeid#3445, min_dt#3289, max_dt#6786], [storeid#3445, min_dt#3289, max_dt#6786, sum(qty#3375) AS sum(qty)#3320]\n                        :  :           :     :                    +- !Filter ((dateclosed#3390 > min_dt#3289) && (dateclosed#3390 <= max_dt#6786))\n                        :  :           :     :                       +- Project [tz_brand_id#3449, storeid#3445, qty#3375, dateclosed#3390, grossreceipts#3381, ticketid#3387, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#3299]\n                        :  :           :     :                          +- Project [tz_brand_id#3449, storeid#3445, [tz_brand_id#7085, max_dt#6786]\n                        :  :           :           +- Project [tz_brand_id#7085, min_dt#3289, max_dt#6786, coalesce((brand_qty#3346 / total_qty#3326), cast(0 as double)) AS norm_qty#3472, storeid#7081]\n                        :  :           :              +- Join LeftOuter, (storeid#7081 = storeid#3445)\n                        :  :           :                 :- SubqueryAlias `a`\n                        :  :           :                 :  +- Project [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty)#3339 AS brand_qty#3346]\n                        :  :           :                 :     +- Aggregate [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085], [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty#7011) AS sum(qty)#3339]\n                        :  :           :                 :        +- Filter ((dateclosed#7026 > min_dt#3289) && (dateclosed#7026 <= max_dt#6786))\n                        :  :           :                 :           +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#6786]\n                        :  :           :                 :              +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n                        :  :           :                 :                 +- Filter (dateclosed#7026 > filter_date#411)\n                        :  :           :                 :                    +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n                        :  :           :                 :                       +- Filter storeid#7081 IN (85,130,77,127,87)\n                        :  :           :                 :                          +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n                        :  :           :                 :                             +- Filter isnotnull(tz_brand_id#7085)\n                        :  :           :                 :                                +- Filter NOT (storeid#7081 = 230)\n                        :  :           :                 :                                   
Qty#3948, ... 10 more fields], false\n                        :  +- Project [_1#4489 AS id#4492, _2#4490 AS features#4493]\n                        :     +- SerializeFromObject [assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#4489, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4490]\n                        :        +- ExternalRDD [obj#4488]\n                        +- Project [_1#4499 AS id#4502, _2#4500 AS features#4503]\n                           +- SerializeFromObject [assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#4499, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4500]\n                              +- ExternalRDD [obj#4498]\n'

我选择了您的数据样本。您可以在 tsteval df 中将 store_id 列重命名为 store_id_a,然后使用 inner join。我重命名了 store_id,因为在选择列时它是 ambiguous

    from pyspark.sql import functions as F

    tsteval = sc.parallelize([
        (2847, 87, 1.0, "2020-06-15", 21.1453375, "02c8ec06-a75a-4dd2-89e2-dbbf1dxxxxxx", "2020-05-16"),  (3285, 127, 1.0, "2020-06-02", 20.642125,"941d2889-230f-4a19-9cb9-90f7b2xxxxxx", "2020-05-16"),
        (2799, 127, 1.0, "2020-06-03", 23.642125, "997ab6b3-b4b5-48e4-884d-00834xxxxxx", "2020-05-16")
    ]).toDF(["tz_brand_id", "storeid", "qty", "dateclosed", "grossreceipts ","ticketid", "max_dt "])


    tsteval_rn = tsteval.withColumnRenamed("storeid", "storeid_a")

    tsteval_rn.show()

    # +-----------+---------+---+----------+--------------+--------------------+----------+
    # |tz_brand_id|storeid_a|qty|dateclosed|grossreceipts |            ticketid|   max_dt |
    # +-----------+---------+---+----------+--------------+--------------------+----------+
    # |       2847|       87|1.0|2020-06-15|    21.1453375|02c8ec06-a75a-4dd...|2020-05-16|
    # |       3285|      127|1.0|2020-06-02|     20.642125|941d2889-230f-4a1...|2020-05-16|
    # |       2799|      127|1.0|2020-06-03|     23.642125|997ab6b3-b4b5-48e...|2020-05-16|
    # +-----------+---------+---+----------+--------------+--------------------+----------+



    top_rec = sc.parallelize([
        (127, "2020-05-16", 2799), (127, "2020-05-16", 2799)
    ]).toDF(["storeid", "date", "tz_brand_id"])

    top_rec.show()

    # +-------+----------+-----------+
    # |storeid|      date|tz_brand_id|
    # +-------+----------+-----------+
    # |    127|2020-05-16|       2799|
    # |    127|2020-05-16|       2799|
    # +-------+----------+-----------+

    df3 = tsteval_rn.join(top_rec, [(tsteval_rn.storeid_a==top_rec.storeid)&(tsteval_rn.tz_brand_id == top_rec.tz_brand_id)], how='inner')

    df3.select(F.col('storeid_a').alias("storeid"),'ticketid').dropDuplicates().show(truncate=False)

    # +-------+-----------------------------------+
    # |storeid|ticketid                           |
    # +-------+-----------------------------------+
    # |127    |997ab6b3-b4b5-48e4-884d-00834xxxxxx|
    # +-------+-----------------------------------+