AWS Glue 动态过滤 - 使用另一个动态帧过滤一个动态帧
AWS Glue Dynamic Filtering - Filter one dynamic frame using another dynamic frame
我正在尝试根据驻留在另一个动态框架中的数据来过滤动态过滤,我正在研究 join and relational example,在此代码中,人员和成员动态框架由 id 加入,但我想过滤基于成员 DF 中存在的 id 的人,下面是我放置静态值的代码
import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
# catalog: database and table names
db_name = "legislators"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"
# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)
persons = persons.drop_fields(['links', 'identifiers','other_names', 'images','contact_details'])
# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers','links'])
fileredPersons = Filter.apply(frame = persons,
f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
print "Filtered record count: ", fileredPersons.count()
下面是过滤逻辑
fileredPersons = Filter.apply(frame = persons,
f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
我想将会员 DF 中的 person_id 列传递给过滤功能条件,基本上过滤会员中的人,任何帮助将不胜感激。
您可以简单地执行内部连接而不是过滤
persons_filtered = persons.alias('persons').join(memberships, persons.id==memberships.id).select('persons.*')
这只会为您提供过滤后的值。
如果您的会员 df 很小或有点查找那么您甚至可以广播它以获得更快的结果
from pyspark.sql.functions import broadcast
persons_filtered = persons.alias('persons').join(broadcast(memberships), persons.id==memberships.id).select('persons.*')
希望对您有所帮助。
我正在尝试根据驻留在另一个动态框架中的数据来过滤动态过滤,我正在研究 join and relational example,在此代码中,人员和成员动态框架由 id 加入,但我想过滤基于成员 DF 中存在的 id 的人,下面是我放置静态值的代码
import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
# catalog: database and table names
db_name = "legislators"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"
# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)
persons = persons.drop_fields(['links', 'identifiers','other_names', 'images','contact_details'])
# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers','links'])
fileredPersons = Filter.apply(frame = persons,
f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
print "Filtered record count: ", fileredPersons.count()
下面是过滤逻辑
fileredPersons = Filter.apply(frame = persons,
f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
我想将会员 DF 中的 person_id 列传递给过滤功能条件,基本上过滤会员中的人,任何帮助将不胜感激。
您可以简单地执行内部连接而不是过滤
persons_filtered = persons.alias('persons').join(memberships, persons.id==memberships.id).select('persons.*')
这只会为您提供过滤后的值。 如果您的会员 df 很小或有点查找那么您甚至可以广播它以获得更快的结果
from pyspark.sql.functions import broadcast
persons_filtered = persons.alias('persons').join(broadcast(memberships), persons.id==memberships.id).select('persons.*')
希望对您有所帮助。