pyspark 加入多个数据框
pyspark Join multiple Dataframes
我是 Spark 和大数据领域的新手。
我使用一些气流 DAG 将我的 MySQL 数据库传输到 HDFS,现在每个 table 都是 HDFS 中的镶木地板文件,现在我需要通过数据帧将 blew 查询转换为 pyspark。
SELECT PV.id product_id,
ZP.vendor_id vendor_id,
V.title vendor_name,
PV.barcode barcode,
PV.title product_title,
ZP.active product_active,
ZP.price product_price,
ZP.capacity product_capacity,
ZP.stock product_stock,
MC1.title subcat_title,
MC2.title parent_category_title,
ZB.title brand_name
FROM xpediaProductVariationVendorInfo ZP
JOIN ProductVariations PV ON PV.id = ZP.xpediaProductVariation_id
JOIN Vendors V ON ZP.vendor_id = V.id
JOIN VendorTypes vt ON V.vendor_type_id = vt.id
JOIN xpediaProductVariation ZPV ON ZPV.id = PV.id
JOIN MenuCategories MC1 ON PV.menu_category_id = MC1.id
LEFT JOIN MenuCategories MC2 ON MC1.parent_id = MC2.id
LEFT JOIN xpedia_brand ZB ON ZB.id = ZPV.brand_id
WHERE ZP.vendor_id={}
AND V.status not in ('Suspend')
GROUP BY PV.id,
ZP.vendor_id;
那里有很多逻辑,您的分组依据与您选择的列不一致。 (选择了 12 列,仅对 2 列进行了分组)
在 pyspark 中,您将不得不重写如下内容 -
import pyspark.sql.functions as F
df_output = (df_xpediaProductVariationVendorInfo.alias("ZP")
.join (df_ProductVariations.alias("PV"), F.col("PV.id") == F.col("ZP.xpediaProductVariation_id"))
.join (df_Vendors.alias("V"), F.col("ZP.vendor_id") == F.col("V.id"))
.join (df_VendorTypes.alias("vt"), F.col("V.vendor_type_id") == F.col("vt.id"))
.join (df_xpediaProductVariation.alias("ZPV"), F.col("ZPV.id") == F.col("PV.id"))
.join (df_MenuCategories.alias("MC1"), F.col("PV.menu_category_id") = F.col("MC1.id"))
.join (df_MenuCategories.alias("MC2"), F.col("MC1.parent_id") = F.col("MC2.id"), "left")
.join (df_xpedia_brand.alias("ZB"), F.col("ZB.id") = F.col("ZPV.brand_id"), "left")
.where ((F.col("ZP.vendor_id") == {})
& ~(F.col("V.status").isin('Suspend')))
.select(F.col("PV.id"),
F.col("ZP.vendor_id"),
F.col("V.title"),
F.col("PV.barcode"),
F.col("PV.title"),
F.col("ZP.active"),
F.col("ZP.price"),
F.col("ZP.capacity"),
F.col("ZP.stock"),
F.col("MC1.title"),
F.col("MC2.title"),
F.col("ZB.title"))
)
df_output.show()
我是 Spark 和大数据领域的新手。 我使用一些气流 DAG 将我的 MySQL 数据库传输到 HDFS,现在每个 table 都是 HDFS 中的镶木地板文件,现在我需要通过数据帧将 blew 查询转换为 pyspark。
SELECT PV.id product_id,
ZP.vendor_id vendor_id,
V.title vendor_name,
PV.barcode barcode,
PV.title product_title,
ZP.active product_active,
ZP.price product_price,
ZP.capacity product_capacity,
ZP.stock product_stock,
MC1.title subcat_title,
MC2.title parent_category_title,
ZB.title brand_name
FROM xpediaProductVariationVendorInfo ZP
JOIN ProductVariations PV ON PV.id = ZP.xpediaProductVariation_id
JOIN Vendors V ON ZP.vendor_id = V.id
JOIN VendorTypes vt ON V.vendor_type_id = vt.id
JOIN xpediaProductVariation ZPV ON ZPV.id = PV.id
JOIN MenuCategories MC1 ON PV.menu_category_id = MC1.id
LEFT JOIN MenuCategories MC2 ON MC1.parent_id = MC2.id
LEFT JOIN xpedia_brand ZB ON ZB.id = ZPV.brand_id
WHERE ZP.vendor_id={}
AND V.status not in ('Suspend')
GROUP BY PV.id,
ZP.vendor_id;
那里有很多逻辑,您的分组依据与您选择的列不一致。 (选择了 12 列,仅对 2 列进行了分组) 在 pyspark 中,您将不得不重写如下内容 -
import pyspark.sql.functions as F
df_output = (df_xpediaProductVariationVendorInfo.alias("ZP")
.join (df_ProductVariations.alias("PV"), F.col("PV.id") == F.col("ZP.xpediaProductVariation_id"))
.join (df_Vendors.alias("V"), F.col("ZP.vendor_id") == F.col("V.id"))
.join (df_VendorTypes.alias("vt"), F.col("V.vendor_type_id") == F.col("vt.id"))
.join (df_xpediaProductVariation.alias("ZPV"), F.col("ZPV.id") == F.col("PV.id"))
.join (df_MenuCategories.alias("MC1"), F.col("PV.menu_category_id") = F.col("MC1.id"))
.join (df_MenuCategories.alias("MC2"), F.col("MC1.parent_id") = F.col("MC2.id"), "left")
.join (df_xpedia_brand.alias("ZB"), F.col("ZB.id") = F.col("ZPV.brand_id"), "left")
.where ((F.col("ZP.vendor_id") == {})
& ~(F.col("V.status").isin('Suspend')))
.select(F.col("PV.id"),
F.col("ZP.vendor_id"),
F.col("V.title"),
F.col("PV.barcode"),
F.col("PV.title"),
F.col("ZP.active"),
F.col("ZP.price"),
F.col("ZP.capacity"),
F.col("ZP.stock"),
F.col("MC1.title"),
F.col("MC2.title"),
F.col("ZB.title"))
)
df_output.show()