pyspark 加入多个数据框

pyspark Join multiple Dataframes

我是 Spark 和大数据领域的新手。 我使用一些气流 DAG 将我的 MySQL 数据库传输到 HDFS,现在每个 table 都是 HDFS 中的镶木地板文件,现在我需要通过数据帧将 blew 查询转换为 pyspark。

SELECT PV.id product_id,
       ZP.vendor_id vendor_id,
       V.title vendor_name,
       PV.barcode barcode,
       PV.title product_title,
       ZP.active product_active,
       ZP.price product_price,
       ZP.capacity product_capacity,
       ZP.stock product_stock,
       MC1.title subcat_title,
       MC2.title parent_category_title,
       ZB.title brand_name
FROM xpediaProductVariationVendorInfo ZP
JOIN ProductVariations PV ON PV.id = ZP.xpediaProductVariation_id
JOIN Vendors V ON ZP.vendor_id = V.id
JOIN VendorTypes vt ON V.vendor_type_id = vt.id
JOIN xpediaProductVariation ZPV ON ZPV.id = PV.id
JOIN MenuCategories MC1 ON PV.menu_category_id = MC1.id
LEFT JOIN MenuCategories MC2 ON MC1.parent_id = MC2.id
LEFT JOIN xpedia_brand ZB ON ZB.id = ZPV.brand_id
WHERE ZP.vendor_id={}
  AND V.status not in ('Suspend')
GROUP BY PV.id,
         ZP.vendor_id;      

那里有很多逻辑,您的分组依据与您选择的列不一致。 (选择了 12 列,仅对 2 列进行了分组) 在 pyspark 中,您将不得不重写如下内容 -


import pyspark.sql.functions as F

df_output = (df_xpediaProductVariationVendorInfo.alias("ZP")
             .join (df_ProductVariations.alias("PV"), F.col("PV.id") == F.col("ZP.xpediaProductVariation_id"))
             .join (df_Vendors.alias("V"), F.col("ZP.vendor_id") == F.col("V.id"))
             .join (df_VendorTypes.alias("vt"), F.col("V.vendor_type_id") == F.col("vt.id"))
             .join (df_xpediaProductVariation.alias("ZPV"), F.col("ZPV.id") == F.col("PV.id"))
             .join (df_MenuCategories.alias("MC1"), F.col("PV.menu_category_id") = F.col("MC1.id"))
             .join (df_MenuCategories.alias("MC2"), F.col("MC1.parent_id") = F.col("MC2.id"), "left")
             .join (df_xpedia_brand.alias("ZB"), F.col("ZB.id") = F.col("ZPV.brand_id"), "left")
             .where ((F.col("ZP.vendor_id") == {}) 
                     & ~(F.col("V.status").isin('Suspend')))
             .select(F.col("PV.id"),
                     F.col("ZP.vendor_id"),
                     F.col("V.title"),
                     F.col("PV.barcode"),
                     F.col("PV.title"),
                     F.col("ZP.active"),
                     F.col("ZP.price"),
                     F.col("ZP.capacity"),
                     F.col("ZP.stock"),
                     F.col("MC1.title"),
                     F.col("MC2.title"),
                     F.col("ZB.title"))
            )

df_output.show()