从 Postgres 加载之前的 Pyspark 过滤器结果(不要先加载整个 table)
Pyspark filter results before loading from Postgres (do not load entire table first)
我正在尝试将大量数据从 VPC 中的 RDS Postgres 实例迁移到同一 VPC 中的 redshift 集群。我正在尝试使用 PySpark 和 AWS Glue 来执行此操作。我只想迁移最近 6 个月的数据,但是我的查询似乎正在执行整个 table 的负载,然后过滤它,这会导致内存故障。这是我到目前为止的代码:
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
sc = SparkContext()
sc.setLogLevel('WARN')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="db", table_name="table")
datasource0.printSchema()
filtered_dyF = Filter.apply(frame = datasource0, f = lambda x: x["scandate"] > "2020-05-31")
print(filtered_dyF.count())
有什么方法可以在加载查询上应用该过滤器吗?此路径当前尝试 select * from table
,我希望它改为 select * from table where scandate > "2020-05-31"
我最终只使用了 AWS Database Migration Service。实际上很轻松
我正在尝试将大量数据从 VPC 中的 RDS Postgres 实例迁移到同一 VPC 中的 redshift 集群。我正在尝试使用 PySpark 和 AWS Glue 来执行此操作。我只想迁移最近 6 个月的数据,但是我的查询似乎正在执行整个 table 的负载,然后过滤它,这会导致内存故障。这是我到目前为止的代码:
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
sc = SparkContext()
sc.setLogLevel('WARN')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="db", table_name="table")
datasource0.printSchema()
filtered_dyF = Filter.apply(frame = datasource0, f = lambda x: x["scandate"] > "2020-05-31")
print(filtered_dyF.count())
有什么方法可以在加载查询上应用该过滤器吗?此路径当前尝试 select * from table
,我希望它改为 select * from table where scandate > "2020-05-31"
我最终只使用了 AWS Database Migration Service。实际上很轻松