输入文件更改格式时来自嵌套数据框的 Python、Select

Python, Select from Nested Dataframe when Input File Changes Format

df = spark.read.json(['/Users/.../input/json/thisistheinputfile.json'])
df.printSchema()

结果如下:

root
 |-- _metadata: struct (nullable = true)
 |    |-- bundled: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- bundledIds: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- failedInitializations: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- unbundled: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- anonymousId: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- context: struct (nullable = true)
 |    |-- campaign: struct (nullable = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- medium: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- source: string (nullable = true)
 |    |    |-- term: string (nullable = true)
 |    |    |-- utm_campaign: string (nullable = true)
 |    |    |-- utm_medium: string (nullable = true)
 |    |    |-- utm_term: string (nullable = true)
 |    |-- ip: string (nullable = true)

但是(稍后)在某些情况下,输入文件不包含上面出现的一些内容,因为例如,可能活动信息不可用:

root
 |-- _metadata: struct (nullable = true)
 |    |-- bundled: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- bundledIds: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- failedInitializations: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- unbundled: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- anonymousId: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- context: struct (nullable = true)
 |    |-- ip: string (nullable = true)

我希望能够自动 select 某些列,但我不希望脚本在内容不可用时崩溃。请注意,要 selected 的列数比下面的示例多得多:

df_2 = df\
    .select(expr("context.campaign.source").alias("campaign_source"),
            expr("context.campaign.utm_campaign").alias("utm_campaign"),
            'anonymousId')

一种情况可能是 anonymousIdipcontext.campaign.source存在,但不存在 context.campaign.utm_campaign 和所有可能的组合(可以有很多列)。

我尝试列出我想要查找的部分并检查它们是否存在,然后可以将该列表用作数据框的输入 selection。但是我发现这很困难,因为我有一个嵌套的数据框:

lst = ['anonymousId', 
        'source', 
        'utm_campaign', 
        'ip']
col_exists = []
for col in lst: 
    if df_seglog_prod.schema.simpleString().find(col) > 0:
        col_exists.append(col)
    else: 
        print('Column', col, 'does not exist')

df_2 = df.select(col_exsists) #does ofc not work...

关于如何使用这种嵌套数据框有什么技巧吗?

先谢谢了!!

以下步骤帮助解决了我的问题:

def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
    name = prefix + '.' + field.name if prefix else field.name
    dtype = field.dataType
    if isinstance(dtype, ArrayType):
        dtype = dtype.elementType
    if isinstance(dtype, StructType):
        fields += flatten(dtype, prefix=name)
    else:
        fields.append(name)

return fields

def intersection(lst1, lst2):
    # Use of hybrid method
    temp = set(lst2)
    lst3 = [value for value in lst1 if value in temp]
    return lst3

fieldsPathName = flatten(df.schema)
df_prepSchema = df.select(fieldsPathName).toDF(*fieldsPathName)

lst1 = ['context.campaign.source', 
        'context.campaign.utm_campaign', 
        'timestamp', 
        'anonymousId']

lst2 = df.columns
cols = intersection(lst1, lst2)

append_str = '`'
cols = [append_str + col for col in cols]
cols = [col + append_str for col in cols]
df_2 = df_prepSchema.select(cols)