如何实现动态 BigQueryIO 输入

How to implement dynamic BigQueryIO inputs

我在 Google 数据流上使用 Apache Beam。

我的管道从 BigQuery 读取数据,但它取决于执行参数。 我应该能够 运行 一个点(经度,纬度)和几个点的管道。

只有一点,解决方案很简单:我可以将查询作为 ValueProvider。

select * 
from UserProfile 
where id_ in ( select distinct userid 
               from   locations 
               where  ST_DWITHIN(ST_GeogPoint(longitude, latitude),
                                 ST_GeogPoint(10.9765,50.4322),
                                 300)
             )

问题是当我对 运行 的查询有超过 1 个点时。 我尝试在每个点上应用 BigQuery 读取并将结果合并到一个 PCollection 中,但我不知道如何将这些点传递到管道并动态构建它。

一种方法是先将这些地理点放入 table(比方说 my_points_table),然后在子查询中获取它们:

select * from UserProfile where id_ in 
   (
     select distinct userid from locations l 
     left outer join my_points_table t on 1=1
     where 
      ST_DWITHIN(
        ST_GeogPoint(l.longitude, l.latitude),
        ST_GeogPoint(t.longitude, t.latitude),
      300)
   )

如果点的数量不是太多(我会说少于一千),运行此查询的一种简单方法是提供一个带有点集合 WKT 描述的字符串:

select * 
from UserProfile 
where id_ in ( 
    select distinct userid 
    from   locations 
    where ST_DWITHIN(ST_GeogPoint(longitude, latitude),
                     ST_GeogFromText("MULTIPOINT((10.9765 50.4322), (10 50))"),
                     300)
    )

WKT 字符串应该很容易在您的代码中构建。

如果兴趣点的数量较多,我会选择 table 个点,并在位置 tables 和兴趣点 table 之间进行 JOIN:

select * 
from UserProfile 
where id_ in ( 
    select distinct userid 
    from   locations as l, interesting_points as p
    where ST_DWITHIN(ST_GeogPoint(l.longitude, l.latitude),
                     p.point,
                     300)
    )