如何在先验等级为零时分配等级(第 2 部分)

How to distribute ranks when prior rank is zero (part 2)

这是对我先前问题 的扩展。该解决方案非常适用于 postgres 环境,但现在我需要复制到数据块环境 (spark sql)。

问题与之前相同,但现在试图确定如何将此 postgres 查询转换为 spark sql。基本上,如果数据中存在间隙(即按位置和 geo3 分组时没有 micro_geo),它会汇总分配量。对于所有位置和 zip3 组,“估算分配”将等于 1。

这是 postgres 查询,效果很好:

    select location_code, geo3, distance_group, has_micro_geo, imputed_allocation from 
        (
        select ia.*,
               (case when has_micro_geo > 0
                     then sum(allocation) over (partition by location_code, geo3, grp)
                     else 0
                end) as imputed_allocation
        from (select s.*,
                     count(*) filter (where has_micro_geo <> 0) over (partition by location_code, geo3 order by distance_group desc) as grp
              from staging_groups s
             ) ia
        )z

但它翻译得不好,并在数据块中产生此错误:

    Error in SQL statement: ParseException: 
    mismatched input 'from' expecting <EOF>(line 1, pos 78)

    == SQL ==
    select location_code, geo3, distance_group, has_micro_geo, imputed_allocation from 
    ------------------------------------------------------------------------------^^^
        (
        select ia.*,
               (case when has_micro_geo > 0
                     then sum(allocation) over (partition by location_code, geo3, grp)
                     else 0
                end) as imputed_allocation
        from (select s.*,
                     count(*) filter (where has_micro_geo <> 0) over (partition by location_code, geo3 order by distance_group desc) as grp
              from staging_groups s
             ) ia
        )z

或者至少,如何仅转换创建“grp”的内部查询的一部分,然后其余部分可能会起作用。我一直在尝试用其他东西替换这个 filter-where 逻辑,但尝试没有达到预期效果。

    select s.*,
    count(*) filter (where has_micro_geo <> 0) over (partition by location_code, geo3 order by distance_group desc) as grp
    from staging_groups s
    

这是一个带有数据 https://www.db-fiddle.com/f/wisvDZJL9BkWxNFkfLXdEu/0 的 db-fiddle,当前设置为 postgres,但我需要在 spark sql 环境中再次 运行。我试过将其分解并创建不同的表格,但我的小组没有按预期工作。

这是一张可以更好地可视化输出的图像:

您需要重写此子查询:

select s.*,
    count(*) filter (where has_micro_geo <> 0) over (partition by location_code, geo3 order by distance_group desc) as grp
from staging_groups s

虽然 window 和聚合函数的 filter() 子句是标准的 SQL,但目前很少有数据库支持它。相反,考虑一个条件 window sum(),它产生相同的结果:

select s.*,
    sum(case when has_micro_geo <> 0 then 1 else 0 end) over (partition by location_code, geo3 order by distance_group desc) as grp
from staging_groups s

我认为查询的其余部分 运行 在 Spark SQL 中应该没问题。

由于 has_micro_geo 已经是 0/1 标志,您可以将计数(过滤器)重写为

sum(has_micro_geo)
over (partition by location_code, geo3
      order by distance_group desc
      rows unbounded preceding) as grp

添加 rows unbounded preceding 以避免默认的 range unbounded preceding 可能会降低性能。

顺便说一句,我已经在我对 Gordon 对您先前问题的解决方案的评论中写道:-)