如何(空间)将许多 table 连接到另一个 BigQuery table 的每一行?

How to (spatial) JOIN many tables to each row of another BigQuery table?

简介

这个(完全可重现的)问题:

  1. 显示如何创建模拟数据
  2. 显示我希望如何使用 GeoPandas 处理数据
  3. 显示如何将数据上传到 Google BigQuery 并开始基本处理
  4. 询问如何对这些 table 进行与 GeoPandas 数据帧相同的处理,这次使用 BigQuery,尤其是 Geography functions

from shapely.geometry import Point, Polygon
import matplotlib.pyplot as plt
import geopandas as gpd
import numpy as np
from google.cloud import bigquery
client = bigquery.Client()

创建模拟数据

让我们有一些形状优美的 (docs) 多边形,以及 ps 这些多边形的列表:

p0 = Polygon([[0,0],[2,0],[2,1],[0,1]])
p1 = Polygon([[1,-1],[1.5,-1],[6.5,1],[5,1]])
p2 = Polygon([[8,-1.5],[10,-1.5],[10,-1],[8,-1]])
ps = [p0,p1,p2]

我还有一些形状优美的点列表,让我们将每个列表称为“轨迹”:

track0 = [Point(x,y+0.4) for x, y in zip(np.linspace(0,10,30),np.linspace(-0.1,1,30))]    
track1 = [Point(x,np.sin(x)-0.8) for x in np.linspace(0,8,50)]    
track2 = [Point(9,x) for x in np.linspace(-0.5,0.5,8)]

这些曲目的列表:

tracks = [track0,track1,track2]

要可视化这些对象,我们可以这样做:

for p in ps:
    x, y = p.exterior.xy
    plt.plot(x,y)

for track in tracks:
    for p in track:
        x, y = p.xy
        plt.scatter(x,y,c='r',s=2)

我从多边形创建了一个 GeoDataFrame。每个多边形也将有一个名称:

polydf = gpd.GeoDataFrame({'geometry':ps,'name':['a','b','c']})

也从每个轨道创建 GeoDataFrames:

track0df = gpd.GeoDataFrame({'geometry':track0})
track1df = gpd.GeoDataFrame({'geometry':track1})
track2df = gpd.GeoDataFrame({'geometry':track2})

这些 GeoDataFrame 的列表:

trackdfs = [track0df,track1df,track2df]

使用 GeoPandas 进行处理

根据多边形数据框和轨迹数据框列表,我旨在创建包含每个轨迹与每个多边形的交点的数据框。3 个多边形,3 个轨迹,即最多 9 个数据帧。如果轨道和多边形不相交,则不需要数据框。我愿意:

resdf={}
for i, trackdf in enumerate(trackdfs):                    # iterate over each track
    for poly, name in zip(polydf.geometry,polydf.name):   # iterate over each polygon
        filtered = trackdf[trackdf.geometry.within(poly)] # which points from track are within polygon
        if len(filtered) > 0:                             # if more than 0 points are within polygon, then...
            resdf[f'{i}_{name}'] = filtered               # ...add dataframe to the result, a dict of dfs

resdf 将是:

{'0_a':                   geometry
 1  POINT (0.34483 0.33793)
 2  POINT (0.68966 0.37586)
 3  POINT (1.03448 0.41379)
 4  POINT (1.37931 0.45172)
 5  POINT (1.72414 0.48966),
 '0_b':                    geometry
 14  POINT (4.82759 0.83103)
 15  POINT (5.17241 0.86897)
 16  POINT (5.51724 0.90690)
 17  POINT (5.86207 0.94483)
 18  POINT (6.20690 0.98276),
 '1_a':                    geometry
 6   POINT (0.97959 0.03027)
 7   POINT (1.14286 0.10982)
 8   POINT (1.30612 0.16518)
 9   POINT (1.46939 0.19486)
 10  POINT (1.63265 0.19809)
 11  POINT (1.79592 0.17477)
 12  POINT (1.95918 0.12552),
 '1_b':                     geometry
 16  POINT (2.61224 -0.29503)
 17  POINT (2.77551 -0.44204)}

请注意,每个 key 包含用于生成相应 table 值的轨道编号和多边形的 name(由 [=30= 分隔) ]).

通过绘制它包含的数据帧作为值来检查 resdf 字典:

for p in ps:
    x, y = p.exterior.xy
    plt.plot(x,y)

for f in resdf.values():
    x, y = zip(*f.geometry.apply(lambda row: row.xy))
    plt.scatter(x,y,c='r',s=2)

看起来不错。


上传到 BigQuery

现在我将使用过的数据帧上传到 BigQuery。包含多边形的数据框:

polydf.to_wkt().to_gbq('ourdataset.polydf',project_id='ourproject',if_exists='replace')

轨道数据帧也已上传:

for i, trackdf in enumerate(trackdfs):
    trackdf.to_wkt().to_gbq(f'ourdataset.track{i}df',project_id='ourproject',if_exists='replace')

STRINGGEOGRAPHY

数据以字符串形式上传。将数据类型更改为 BQ 的 GEOGRAPHY 数据类型。 Table 包含多边形:

%%bigquery
CREATE TABLE IF NOT EXISTS ourdataset.proc_polydf AS
(
    SELECT SAFE.ST_GEOGFROMTEXT(geometry,make_valid => TRUE) AS geometry, name AS name
    FROM `ourproject.ourdataset.polydf`
)

对第 table 条轨道执行相同的 STRING -> GEOGRAPHY 过程:

for i in range(len(trackdfs)):
    query_text = \
    f'''
    CREATE TABLE IF NOT EXISTS ourdataset.proc_track{i}df AS
    (
        SELECT SAFE.ST_GEOGFROMTEXT(geometry,make_valid => TRUE) AS geometry
        FROM `ourproject.ourdataset.track{i}df`
    )
    '''
    query_job = client.query(query_text)

我将这些新 table 保存到 proc_polydfproc_track0dfproc_track1dfproc_track2dfproc_ 前缀表示包含多边形和点的列现在是 GEOGRAPHY 类型,而不是 STRINGs。


问题

我想在 BigQuery 中执行与在 GeoPandas 中相同的操作。获取每个 table 的轨道并从中获取位于多边形特定行的多边形内的点 table。我想将结果保存在一个新的 table 中,其名称包括多边形所在列中的 name 条目,以及轨道的名称 table。所以最后,我会有 tables,它清楚地显示了创建它的是哪个多边形和哪个轨道。 (就像 resdf:在那里,每个键显示哪个轨道和哪个多边形用于创建特定条目。)

如何在 BigQuery 中执行此操作?

解决方案越“矢量化”,即 for 循环或类似内容越少,对我来说越好,因为我的真实世界数据库很大。

首先,关于在 BigQuery 中建模此数据的建议

  1. 将每个轨道建模为单独的 table 很浪费。我会为所有曲目使用一个 table,然后添加一个带有 track_id.

    的列
  2. SQL table 是无序的 - 行不保持任何顺序,除非您使用 ORDER BY 或类似运算符将其添加到查询中。因此,如果您将轨道中的点建模为行,并且您确实关心点的顺序:添加另一列,point_id。因此曲目 table 将具有架构:

track_id INT64
point_id INT64
coord GEOGRAPHY
  1. 另一种对此类轨道建模的方法是通过数组。在这种情况下,单行映射到整个轨道,其中一列是一个包含轨道点的数组。在这种情况下,保证保留数组中点的顺序(但不跟踪 table 中的行)。但是您可能 运行 遇到很长轨道的可伸缩性和性能问题,所以我会使用上面的平面架构,但为了完整性,此选项将具有架构
track_id INT64
coords ARRAY<GEOGRAPHY>

现在让我们生成一些类似于您所做的示例数据:

create or replace table tmp.ps as 
select * from (
    select 
        1 poly_id, 
        st_geogfromtext('POLYGON((0 0, 2 0, 2 1, 0 1, 0 0))') p
    union all
    select 
        2 poly_id, 
        st_geogfromtext('POLYGON((1 -1, 1.5 -1, 6.5 1, 5 1, 1 -1))') p
);

create or replace table tmp.tracks as 
select * from (
    select 1 track_id, 
        x point_id,
        ST_GeogPoint(x*0.3, x*0.1 - 0.1) g
    from unnest(generate_array(1, 30)) x
    union all 
    select 2 track_id, 
        x point_id,
        ST_GeogPoint(x*0.4, sin(x*0.4) - 0.8) g
    from unnest(generate_array(1, 50)) x
);

它们在 GeoViz 中,使用查询

select * from
(select p from tmp.ps
union all
select g from tmp.tracks)

我们现在可以加入两个 table:

select poly_id, track_id, g
from tmp.ps
join tmp.tracks
on st_intersects(p, g)
order by poly_id, track_id

它为您提供了多边形内的所有轨迹点、多边形 ID 和轨迹 ID:

poly_id track_id    g
1       1       POINT(1.2 0.3)
1       1       POINT(1.5 0.4)
1       1       POINT(0.3 0)
1       1       POINT(0.6 0.1)
1       1       POINT(1.8 0.5)
1       1       POINT(0.9 0.2)
1       2       POINT(2 0.109297426825682)
1       2       POINT(1.2 0.132039085967226)
1       2       POINT(1.6 0.199573603041505)
2       2       POINT(2.8 -0.465011849844095)