Azure Databricks - 使用 spark.sql 与联合和子查询写入镶木地板文件

Azure Databricks - Write to parquet file using spark.sql with union and subqueries

问题:

我正在尝试使用 spark.sql 写入 parquet 文件,但是在使用联合或子查询时遇到问题。我知道有些语法我似乎无法理解。

例如

%python
    
df = spark.sql("SELECT
          sha2(Code, 256) as COUNTRY_SK,
          Code as COUNTRY_CODE,
          Name as COUNTRY_NAME,
          current_date() as EXTRACT_DATE
       
        FROM raw.EXTR_COUNTRY)
       
    UNION ALL
    
    SELECT
          -1 as COUNTRY_SK,
          'Unknown' as COUNTRY_CODE,
          'Unknown' as COUNTRY_NAME,
          current_date() as EXTRACT_DATE")
          
df.write.parquet("dbfs:/mnt/devstorage/landing/companyx/country", 
   mode="overwrite")

当做一个简单的查询时,我完全没有问题,例如:

%python
    
df = spark.sql("select * from raw.EXTR_COUNTRY")
df.write.parquet("dbfs:/mnt/devstorage/landing/companyx/country/", 
   mode="overwrite")

您的代码有几个问题需要修复:

  • 您对 multi-line 字符串使用了单引号 (")。相反,您需要使用三重引号("""'''
  • 您的 SQL 语法对于查询的第二部分(在 union all 之后)不正确 - 您没有指定 FROM 您需要提取哪个 table数据。有关 SQL 语法的详细信息,请参阅 docs

我真的建议单独调试每个子查询,也许首先使用 %sql,只有在它起作用之后,才将其放入 spark.sql 字符串中。

此外,因为您要覆盖数据,所以使用 create or replace table 语法执行 SQL (docs) 中的所有操作可能更容易,如下所示:

create or replace table delta.`/mnt/devstorage/landing/companyx/country/` AS (
SELECT
          sha2(Code, 256) as COUNTRY_SK,
          Code as COUNTRY_CODE,
          Name as COUNTRY_NAME,
          current_date() as EXTRACT_DATE
       
        FROM raw.EXTR_COUNTRY)
       
    UNION ALL
    
    SELECT
          -1 as COUNTRY_SK,
          'Unknown' as COUNTRY_CODE,
          'Unknown' as COUNTRY_NAME,
          current_date() as EXTRACT_DATE
          FROM ....
)

引号解决了问题,sql-script 本身不是问题。所以使用三重引号(“””或''')解决了这个问题。

 %python
        
    df = spark.sql("""SELECT
              sha2(Code, 256) as COUNTRY_SK,
              Code as COUNTRY_CODE,
              Name as COUNTRY_NAME,
              current_date() as EXTRACT_DATE
           
            FROM raw.EXTR_COUNTRY)
           
        UNION ALL
        
        SELECT
              -1 as COUNTRY_SK,
              'Unknown' as COUNTRY_CODE,
              'Unknown' as COUNTRY_NAME,
              current_date() as EXTRACT_DATE""")
              
    df.write.parquet("dbfs:/mnt/devstorage/landing/companyx/country", 
       mode="overwrite")