pyspark 内部连接无法解析明显具有的属性
pyspark inner join unable to resolve attributes which clearly have
我有两个 pyspark 数据帧 tsteval 和 top_rec。我正在尝试创建一个新的数据框 top_rec_tckts,方法是仅针对具有 storeid 和 tz_brand_id 与 top_rec 相同的记录过滤 tsteval。所以我可以从 tsteval 取回这些记录的 storeid 和 ticketid。我有下面两个数据帧的示例输出。它们都有 storeid 和 tz_brand_id 字段。我不明白为什么当我尝试使用内部连接过滤 tsteval 时会出现以下错误。有没有人看到问题是什么,或者你能建议另一种方法来完成这个。很抱歉,我不得不删除下面的一堆错误消息以使其适合。我留下了开头和结尾,我希望这些足够的线索可以看出发生了什么。
tsteval.show(truncate=False)
print('')
top_rec.show(truncate=False)
示例数据:
+----------+----------+
|tz_brand_id|storeid|qty|dateclosed|grossreceipts |ticketid |current_date|filter_date|min_dt |max_dt |
+-----------+-------+---+----------+-------------------+------------------------------------+------------+-----------+----------+----------+
|2847 |87 |1.0|2020-06-15|21.1453375 |02c8ec06-a75a-4dd2-89e2-dbbf1dxxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|2847 |87 |1.0|2020-05-23|21.1453375 |67a34306-6608-4b00-bf72-f1f42xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|2847 |87 |1.0|2020-05-19|26.129683025000002 |82665853-66ad-4e52-851e-f1cdf8xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|3285 |127 |1.0|2020-06-02|20.642125 |d0898233-64b3-48d8-9a46-a03eefxxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|3285 |127 |1.0|2020-05-22|20.642125 |941d2889-230f-4a19-9cb9-90f7b2xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|2747 |77 |1.0|2020-05-30|21.3902 |72c3c7dd-a436-45ae-9adb-f19618xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|9601 |85 |1.0|2020-05-30|23.0 |74328e66-6371-4323-bdf9-057d2xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|9601 |85 |1.0|2020-05-29|20.7 |997ab6b3-b4b5-48e4-884d-00834xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
+-----------+-------+---+----------+-------------------+------------------------------------+------------+-----------+----------+----------+
only showing top 20 rows
+-------+----------+-----------+
|storeid|max_dt |tz_brand_id|
+-------+----------+-----------+
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
+-------+----------+-----------+
代码:
top_rec_tckts=tsteval.alias('a')\
.join(top_rec.alias('b'),
(col('a.storeid')==col('b.storeid'))\
&(col('a.tz_brand_id')==col('b.tz_brand_id')),
how='inner'
)\
.select(col('a.storeid'),
col('a.ticketid'))
错误:
An error was encountered:
'Resolved attribute(s) max_dt#6786 missing from storeid#3445,qty#3375,max_dt#3299,min_dt#3289,grossreceipts#3381,filter_date#411,tz_brand_id#3449,ticketid#3387,dateclosed#3390,current_date#403 in operator !Filter ((dateclosed#3390 > min_dt#3289) && (dateclosed#3390 <= max_dt#6786)). Attribute(s) with the same name appear in the operation: max_dt. Please check if the right attribute(s) are used.;;\nJoin Inner, ((storeid#292 = storeid#7081) && (tz_brand_id#296 = tz_brand_id#4560))\n:- SubqueryAlias `a`\n: +- Filter ((dateclosed#237 > max_dt#3299) && (dateclosed#237 <= date_add(max_dt#3299, 30)))\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#3299]\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n: +- Filter (dateclosed#237 > filter_date#411)\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n: +- Filter storeid#292 IN (85,130,77,127,87)\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n: +- Filter isnotnull(tz_brand_id#296)\n: +- Filter NOT (storeid#292 = 230)\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234]\n: +- Filter (producttype#211 = EDIBLE)\n: +- LogicalRDD [cbd_perc#199, thc_perc#200, register#201, customer_type#202, type#203, customer_state#204, customer_city#205, zip_code#206, age#207, age_group#208, cashier#209, approver#210, producttype#211, productsubtype#212, productattributes#213, productbrand#214, productname#215, classification#216, tier#217, weight#218, unitofmeasure#219, size#220, priceunit#221, qty#222, ... 75 more fields], false\n+- SubqueryAlias `b`\n +- Project [storeid#7081, max_dt#6786, tz_brand_id#4560]\n +- Project [storeid#7081, max_dt#6786, tz_brand_id#7085, prediction#4548, tz_brand_id#4560, tz_brand_id#4560]\n +- Window [first(tz_brand_id#7085, true) windowspecdefinition(storeid#7081, max_dt#6786, prediction#4548 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS tz_brand_id#4560], [storeid#7081, max_dt#6786], [prediction#4548 DESC NULLS LAST]\n +- Project [storeid#7081, max_dt#6786, tz_brand_id#7085, prediction#4548]\n +- Filter AtLeastNNulls(n, prediction#4548)\n +- Project [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366, UDF(features#4493, features#4503) AS prediction#4548]\n +- Join LeftOuter, (UDF(tz_brand_id#7085) = id#4502)\n :- Join LeftOuter, (UDF(storeid#7081) = id#4492)\n : :- Deduplicate [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366]\n : : +- Filter ((cast(max_dt#6786 as timestamp) < accepted_date#4366) || isnull(accepted_date#4366))\n : : +- Project [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366]\n : : +- Join LeftOuter, ((storeid#7081 = storeid#4102) && (tz_brand_id#7085 = tz_brand_id#4103))\n : : :- SubqueryAlias `a`\n : : : +- Join Cross\n : : : :- Project [storeid#7081, max_dt#6786]\n : : : : +- Project [storeid#7081, max_dt#6786]\n : : : : +- Project [tz_brand_id#7085, min_dt#3289, max_dt#6786, coalesce((brand_qty#3346 / total_qty#3326), cast(0 as double)) AS norm_qty#3472, storeid#7081]\n : : : : +- Join LeftOuter, (storeid#7081 = storeid#3445)\n : : : : :- SubqueryAlias `a`\n : : : : : +- Project [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty)#3339 AS brand_qty#3346]\n : : : : : +- Aggregate [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085], [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty#7011) AS sum(qty)#3339]\n : : : : : +- Filter ((dateclosed#7026 > min_dt#3289) && (dateclosed#7026 <= max_dt#6786))\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#6786]\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n : : : : : +- Filter (dateclosed#7026 > filter_date#411)\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n : : : : : +- Filter storeid#7081 IN (85,130,77,127,87)\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n : : : : : +- Filter isnotnull(tz_brand_id#7085)\n : : : : : +- Filter NOT (storeid#7081 = 230)\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023]\n : : : : : +- Filter (producttype#7000 = EDIBLE)\n : : : : : +- LogicalRDD [cbd_perc#6988, thc_perc#6989, register#6990, customer_type#6991, type#6992, customer_state#6993, customer_city#6994, zip_code#6995, age#6996, age_group#6997, cashier#6998, approver#6999, producttype#7000, productsubtype#7001, productattributes#7002, productbrand#7003, productname#7004, classification#7005, tier#7006, weight#7007, unitofmeasure#7008, size#7009, priceunit#7010, qty#7011, ... 75 more fields], false\n : : : : +- SubqueryAlias `b`\n : : : : +- Project [storeid#3445, sum(qty)#3320 AS total_qty#3326]\n : : : : +- !Aggregate [storeid#3445, min_dt#3289, max_dt#6786], [storeid#3445, min_dt#3289, max_dt#6786, sum(qty#3375) AS sum(qty)#3320]\n : : : : +- !Filter ((dateclosed#3390 > min_dt#3289) && (dateclosed#3390 <= max_dt#6786))\n : : : : +- Project [tz_brand_id#3449, storeid#3445, qty#3375, dateclosed#3390, grossreceipts#3381, ticketid#3387, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#3299]\n : : : : +- Project [tz_brand_id#3449, storeid#3445, [tz_brand_id#7085, max_dt#6786]\n : : : +- Project [tz_brand_id#7085, min_dt#3289, max_dt#6786, coalesce((brand_qty#3346 / total_qty#3326), cast(0 as double)) AS norm_qty#3472, storeid#7081]\n : : : +- Join LeftOuter, (storeid#7081 = storeid#3445)\n : : : :- SubqueryAlias `a`\n : : : : +- Project [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty)#3339 AS brand_qty#3346]\n : : : : +- Aggregate [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085], [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty#7011) AS sum(qty)#3339]\n : : : : +- Filter ((dateclosed#7026 > min_dt#3289) && (dateclosed#7026 <= max_dt#6786))\n : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#6786]\n : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n : : : : +- Filter (dateclosed#7026 > filter_date#411)\n : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n : : : : +- Filter storeid#7081 IN (85,130,77,127,87)\n : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n : : : : +- Filter isnotnull(tz_brand_id#7085)\n : : : : +- Filter NOT (storeid#7081 = 230)\n : : : :
Qty#3948, ... 10 more fields], false\n : +- Project [_1#4489 AS id#4492, _2#4490 AS features#4493]\n : +- SerializeFromObject [assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#4489, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4490]\n : +- ExternalRDD [obj#4488]\n +- Project [_1#4499 AS id#4502, _2#4500 AS features#4503]\n +- SerializeFromObject [assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#4499, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4500]\n +- ExternalRDD [obj#4498]\n'
我选择了您的数据样本。您可以在 tsteval
df 中将 store_id
列重命名为 store_id_a
,然后使用 inner join
。我重命名了 store_id,因为在选择列时它是 ambiguous
。
from pyspark.sql import functions as F
tsteval = sc.parallelize([
(2847, 87, 1.0, "2020-06-15", 21.1453375, "02c8ec06-a75a-4dd2-89e2-dbbf1dxxxxxx", "2020-05-16"), (3285, 127, 1.0, "2020-06-02", 20.642125,"941d2889-230f-4a19-9cb9-90f7b2xxxxxx", "2020-05-16"),
(2799, 127, 1.0, "2020-06-03", 23.642125, "997ab6b3-b4b5-48e4-884d-00834xxxxxx", "2020-05-16")
]).toDF(["tz_brand_id", "storeid", "qty", "dateclosed", "grossreceipts ","ticketid", "max_dt "])
tsteval_rn = tsteval.withColumnRenamed("storeid", "storeid_a")
tsteval_rn.show()
# +-----------+---------+---+----------+--------------+--------------------+----------+
# |tz_brand_id|storeid_a|qty|dateclosed|grossreceipts | ticketid| max_dt |
# +-----------+---------+---+----------+--------------+--------------------+----------+
# | 2847| 87|1.0|2020-06-15| 21.1453375|02c8ec06-a75a-4dd...|2020-05-16|
# | 3285| 127|1.0|2020-06-02| 20.642125|941d2889-230f-4a1...|2020-05-16|
# | 2799| 127|1.0|2020-06-03| 23.642125|997ab6b3-b4b5-48e...|2020-05-16|
# +-----------+---------+---+----------+--------------+--------------------+----------+
top_rec = sc.parallelize([
(127, "2020-05-16", 2799), (127, "2020-05-16", 2799)
]).toDF(["storeid", "date", "tz_brand_id"])
top_rec.show()
# +-------+----------+-----------+
# |storeid| date|tz_brand_id|
# +-------+----------+-----------+
# | 127|2020-05-16| 2799|
# | 127|2020-05-16| 2799|
# +-------+----------+-----------+
df3 = tsteval_rn.join(top_rec, [(tsteval_rn.storeid_a==top_rec.storeid)&(tsteval_rn.tz_brand_id == top_rec.tz_brand_id)], how='inner')
df3.select(F.col('storeid_a').alias("storeid"),'ticketid').dropDuplicates().show(truncate=False)
# +-------+-----------------------------------+
# |storeid|ticketid |
# +-------+-----------------------------------+
# |127 |997ab6b3-b4b5-48e4-884d-00834xxxxxx|
# +-------+-----------------------------------+
我有两个 pyspark 数据帧 tsteval 和 top_rec。我正在尝试创建一个新的数据框 top_rec_tckts,方法是仅针对具有 storeid 和 tz_brand_id 与 top_rec 相同的记录过滤 tsteval。所以我可以从 tsteval 取回这些记录的 storeid 和 ticketid。我有下面两个数据帧的示例输出。它们都有 storeid 和 tz_brand_id 字段。我不明白为什么当我尝试使用内部连接过滤 tsteval 时会出现以下错误。有没有人看到问题是什么,或者你能建议另一种方法来完成这个。很抱歉,我不得不删除下面的一堆错误消息以使其适合。我留下了开头和结尾,我希望这些足够的线索可以看出发生了什么。
tsteval.show(truncate=False)
print('')
top_rec.show(truncate=False)
示例数据:
+----------+----------+
|tz_brand_id|storeid|qty|dateclosed|grossreceipts |ticketid |current_date|filter_date|min_dt |max_dt |
+-----------+-------+---+----------+-------------------+------------------------------------+------------+-----------+----------+----------+
|2847 |87 |1.0|2020-06-15|21.1453375 |02c8ec06-a75a-4dd2-89e2-dbbf1dxxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|2847 |87 |1.0|2020-05-23|21.1453375 |67a34306-6608-4b00-bf72-f1f42xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|2847 |87 |1.0|2020-05-19|26.129683025000002 |82665853-66ad-4e52-851e-f1cdf8xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|3285 |127 |1.0|2020-06-02|20.642125 |d0898233-64b3-48d8-9a46-a03eefxxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|3285 |127 |1.0|2020-05-22|20.642125 |941d2889-230f-4a19-9cb9-90f7b2xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|2747 |77 |1.0|2020-05-30|21.3902 |72c3c7dd-a436-45ae-9adb-f19618xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|9601 |85 |1.0|2020-05-30|23.0 |74328e66-6371-4323-bdf9-057d2xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
|9601 |85 |1.0|2020-05-29|20.7 |997ab6b3-b4b5-48e4-884d-00834xxxxxx|2020-07-15 |2020-03-17 |2020-03-17|2020-05-16|
+-----------+-------+---+----------+-------------------+------------------------------------+------------+-----------+----------+----------+
only showing top 20 rows
+-------+----------+-----------+
|storeid|max_dt |tz_brand_id|
+-------+----------+-----------+
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
|127 |2020-05-16|2799 |
+-------+----------+-----------+
代码:
top_rec_tckts=tsteval.alias('a')\
.join(top_rec.alias('b'),
(col('a.storeid')==col('b.storeid'))\
&(col('a.tz_brand_id')==col('b.tz_brand_id')),
how='inner'
)\
.select(col('a.storeid'),
col('a.ticketid'))
错误:
An error was encountered:
'Resolved attribute(s) max_dt#6786 missing from storeid#3445,qty#3375,max_dt#3299,min_dt#3289,grossreceipts#3381,filter_date#411,tz_brand_id#3449,ticketid#3387,dateclosed#3390,current_date#403 in operator !Filter ((dateclosed#3390 > min_dt#3289) && (dateclosed#3390 <= max_dt#6786)). Attribute(s) with the same name appear in the operation: max_dt. Please check if the right attribute(s) are used.;;\nJoin Inner, ((storeid#292 = storeid#7081) && (tz_brand_id#296 = tz_brand_id#4560))\n:- SubqueryAlias `a`\n: +- Filter ((dateclosed#237 > max_dt#3299) && (dateclosed#237 <= date_add(max_dt#3299, 30)))\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#3299]\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n: +- Filter (dateclosed#237 > filter_date#411)\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n: +- Filter storeid#292 IN (85,130,77,127,87)\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n: +- Filter isnotnull(tz_brand_id#296)\n: +- Filter NOT (storeid#292 = 230)\n: +- Project [tz_brand_id#296, storeid#292, qty#222, dateclosed#237, grossreceipts#228, ticketid#234]\n: +- Filter (producttype#211 = EDIBLE)\n: +- LogicalRDD [cbd_perc#199, thc_perc#200, register#201, customer_type#202, type#203, customer_state#204, customer_city#205, zip_code#206, age#207, age_group#208, cashier#209, approver#210, producttype#211, productsubtype#212, productattributes#213, productbrand#214, productname#215, classification#216, tier#217, weight#218, unitofmeasure#219, size#220, priceunit#221, qty#222, ... 75 more fields], false\n+- SubqueryAlias `b`\n +- Project [storeid#7081, max_dt#6786, tz_brand_id#4560]\n +- Project [storeid#7081, max_dt#6786, tz_brand_id#7085, prediction#4548, tz_brand_id#4560, tz_brand_id#4560]\n +- Window [first(tz_brand_id#7085, true) windowspecdefinition(storeid#7081, max_dt#6786, prediction#4548 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS tz_brand_id#4560], [storeid#7081, max_dt#6786], [prediction#4548 DESC NULLS LAST]\n +- Project [storeid#7081, max_dt#6786, tz_brand_id#7085, prediction#4548]\n +- Filter AtLeastNNulls(n, prediction#4548)\n +- Project [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366, UDF(features#4493, features#4503) AS prediction#4548]\n +- Join LeftOuter, (UDF(tz_brand_id#7085) = id#4502)\n :- Join LeftOuter, (UDF(storeid#7081) = id#4492)\n : :- Deduplicate [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366]\n : : +- Filter ((cast(max_dt#6786 as timestamp) < accepted_date#4366) || isnull(accepted_date#4366))\n : : +- Project [storeid#7081, tz_brand_id#7085, max_dt#6786, accepted_date#4366]\n : : +- Join LeftOuter, ((storeid#7081 = storeid#4102) && (tz_brand_id#7085 = tz_brand_id#4103))\n : : :- SubqueryAlias `a`\n : : : +- Join Cross\n : : : :- Project [storeid#7081, max_dt#6786]\n : : : : +- Project [storeid#7081, max_dt#6786]\n : : : : +- Project [tz_brand_id#7085, min_dt#3289, max_dt#6786, coalesce((brand_qty#3346 / total_qty#3326), cast(0 as double)) AS norm_qty#3472, storeid#7081]\n : : : : +- Join LeftOuter, (storeid#7081 = storeid#3445)\n : : : : :- SubqueryAlias `a`\n : : : : : +- Project [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty)#3339 AS brand_qty#3346]\n : : : : : +- Aggregate [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085], [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty#7011) AS sum(qty)#3339]\n : : : : : +- Filter ((dateclosed#7026 > min_dt#3289) && (dateclosed#7026 <= max_dt#6786))\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#6786]\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n : : : : : +- Filter (dateclosed#7026 > filter_date#411)\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n : : : : : +- Filter storeid#7081 IN (85,130,77,127,87)\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n : : : : : +- Filter isnotnull(tz_brand_id#7085)\n : : : : : +- Filter NOT (storeid#7081 = 230)\n : : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023]\n : : : : : +- Filter (producttype#7000 = EDIBLE)\n : : : : : +- LogicalRDD [cbd_perc#6988, thc_perc#6989, register#6990, customer_type#6991, type#6992, customer_state#6993, customer_city#6994, zip_code#6995, age#6996, age_group#6997, cashier#6998, approver#6999, producttype#7000, productsubtype#7001, productattributes#7002, productbrand#7003, productname#7004, classification#7005, tier#7006, weight#7007, unitofmeasure#7008, size#7009, priceunit#7010, qty#7011, ... 75 more fields], false\n : : : : +- SubqueryAlias `b`\n : : : : +- Project [storeid#3445, sum(qty)#3320 AS total_qty#3326]\n : : : : +- !Aggregate [storeid#3445, min_dt#3289, max_dt#6786], [storeid#3445, min_dt#3289, max_dt#6786, sum(qty#3375) AS sum(qty)#3320]\n : : : : +- !Filter ((dateclosed#3390 > min_dt#3289) && (dateclosed#3390 <= max_dt#6786))\n : : : : +- Project [tz_brand_id#3449, storeid#3445, qty#3375, dateclosed#3390, grossreceipts#3381, ticketid#3387, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#3299]\n : : : : +- Project [tz_brand_id#3449, storeid#3445, [tz_brand_id#7085, max_dt#6786]\n : : : +- Project [tz_brand_id#7085, min_dt#3289, max_dt#6786, coalesce((brand_qty#3346 / total_qty#3326), cast(0 as double)) AS norm_qty#3472, storeid#7081]\n : : : +- Join LeftOuter, (storeid#7081 = storeid#3445)\n : : : :- SubqueryAlias `a`\n : : : : +- Project [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty)#3339 AS brand_qty#3346]\n : : : : +- Aggregate [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085], [storeid#7081, min_dt#3289, max_dt#6786, tz_brand_id#7085, sum(qty#7011) AS sum(qty)#3339]\n : : : : +- Filter ((dateclosed#7026 > min_dt#3289) && (dateclosed#7026 <= max_dt#6786))\n : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, min_dt#3289, date_add(filter_date#411, 60) AS max_dt#6786]\n : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, filter_date#411, date_add(filter_date#411, 0) AS min_dt#3289]\n : : : : +- Filter (dateclosed#7026 > filter_date#411)\n : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, current_date#403, date_add(current_date#403, -120) AS filter_date#411]\n : : : : +- Filter storeid#7081 IN (85,130,77,127,87)\n : : : : +- Project [tz_brand_id#7085, storeid#7081, qty#7011, dateclosed#7026, grossreceipts#7017, ticketid#7023, to_date(cast(unix_timestamp(2020-07-15 21:17:18, yyyy-MM-dd, None) as timestamp), None) AS current_date#403]\n : : : : +- Filter isnotnull(tz_brand_id#7085)\n : : : : +- Filter NOT (storeid#7081 = 230)\n : : : :
Qty#3948, ... 10 more fields], false\n : +- Project [_1#4489 AS id#4492, _2#4490 AS features#4493]\n : +- SerializeFromObject [assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#4489, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4490]\n : +- ExternalRDD [obj#4488]\n +- Project [_1#4499 AS id#4502, _2#4500 AS features#4503]\n +- SerializeFromObject [assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#4499, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4500]\n +- ExternalRDD [obj#4498]\n'
我选择了您的数据样本。您可以在 tsteval
df 中将 store_id
列重命名为 store_id_a
,然后使用 inner join
。我重命名了 store_id,因为在选择列时它是 ambiguous
。
from pyspark.sql import functions as F
tsteval = sc.parallelize([
(2847, 87, 1.0, "2020-06-15", 21.1453375, "02c8ec06-a75a-4dd2-89e2-dbbf1dxxxxxx", "2020-05-16"), (3285, 127, 1.0, "2020-06-02", 20.642125,"941d2889-230f-4a19-9cb9-90f7b2xxxxxx", "2020-05-16"),
(2799, 127, 1.0, "2020-06-03", 23.642125, "997ab6b3-b4b5-48e4-884d-00834xxxxxx", "2020-05-16")
]).toDF(["tz_brand_id", "storeid", "qty", "dateclosed", "grossreceipts ","ticketid", "max_dt "])
tsteval_rn = tsteval.withColumnRenamed("storeid", "storeid_a")
tsteval_rn.show()
# +-----------+---------+---+----------+--------------+--------------------+----------+
# |tz_brand_id|storeid_a|qty|dateclosed|grossreceipts | ticketid| max_dt |
# +-----------+---------+---+----------+--------------+--------------------+----------+
# | 2847| 87|1.0|2020-06-15| 21.1453375|02c8ec06-a75a-4dd...|2020-05-16|
# | 3285| 127|1.0|2020-06-02| 20.642125|941d2889-230f-4a1...|2020-05-16|
# | 2799| 127|1.0|2020-06-03| 23.642125|997ab6b3-b4b5-48e...|2020-05-16|
# +-----------+---------+---+----------+--------------+--------------------+----------+
top_rec = sc.parallelize([
(127, "2020-05-16", 2799), (127, "2020-05-16", 2799)
]).toDF(["storeid", "date", "tz_brand_id"])
top_rec.show()
# +-------+----------+-----------+
# |storeid| date|tz_brand_id|
# +-------+----------+-----------+
# | 127|2020-05-16| 2799|
# | 127|2020-05-16| 2799|
# +-------+----------+-----------+
df3 = tsteval_rn.join(top_rec, [(tsteval_rn.storeid_a==top_rec.storeid)&(tsteval_rn.tz_brand_id == top_rec.tz_brand_id)], how='inner')
df3.select(F.col('storeid_a').alias("storeid"),'ticketid').dropDuplicates().show(truncate=False)
# +-------+-----------------------------------+
# |storeid|ticketid |
# +-------+-----------------------------------+
# |127 |997ab6b3-b4b5-48e4-884d-00834xxxxxx|
# +-------+-----------------------------------+