输入文件更改格式时来自嵌套数据框的 Python、Select
Python, Select from Nested Dataframe when Input File Changes Format
df = spark.read.json(['/Users/.../input/json/thisistheinputfile.json'])
df.printSchema()
结果如下:
root
|-- _metadata: struct (nullable = true)
| |-- bundled: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- bundledIds: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- failedInitializations: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- unbundled: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- anonymousId: string (nullable = true)
|-- channel: string (nullable = true)
|-- context: struct (nullable = true)
| |-- campaign: struct (nullable = true)
| | |-- content: string (nullable = true)
| | |-- medium: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- source: string (nullable = true)
| | |-- term: string (nullable = true)
| | |-- utm_campaign: string (nullable = true)
| | |-- utm_medium: string (nullable = true)
| | |-- utm_term: string (nullable = true)
| |-- ip: string (nullable = true)
但是(稍后)在某些情况下,输入文件不包含上面出现的一些内容,因为例如,可能活动信息不可用:
root
|-- _metadata: struct (nullable = true)
| |-- bundled: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- bundledIds: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- failedInitializations: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- unbundled: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- anonymousId: string (nullable = true)
|-- channel: string (nullable = true)
|-- context: struct (nullable = true)
| |-- ip: string (nullable = true)
我希望能够自动 select 某些列,但我不希望脚本在内容不可用时崩溃。请注意,要 selected 的列数比下面的示例多得多:
df_2 = df\
.select(expr("context.campaign.source").alias("campaign_source"),
expr("context.campaign.utm_campaign").alias("utm_campaign"),
'anonymousId')
一种情况可能是 anonymousId、ip 和 context.campaign.source存在,但不存在 context.campaign.utm_campaign 和所有可能的组合(可以有很多列)。
我尝试列出我想要查找的部分并检查它们是否存在,然后可以将该列表用作数据框的输入 selection。但是我发现这很困难,因为我有一个嵌套的数据框:
lst = ['anonymousId',
'source',
'utm_campaign',
'ip']
col_exists = []
for col in lst:
if df_seglog_prod.schema.simpleString().find(col) > 0:
col_exists.append(col)
else:
print('Column', col, 'does not exist')
df_2 = df.select(col_exsists) #does ofc not work...
关于如何使用这种嵌套数据框有什么技巧吗?
先谢谢了!!
以下步骤帮助解决了我的问题:
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = prefix + '.' + field.name if prefix else field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
def intersection(lst1, lst2):
# Use of hybrid method
temp = set(lst2)
lst3 = [value for value in lst1 if value in temp]
return lst3
fieldsPathName = flatten(df.schema)
df_prepSchema = df.select(fieldsPathName).toDF(*fieldsPathName)
lst1 = ['context.campaign.source',
'context.campaign.utm_campaign',
'timestamp',
'anonymousId']
lst2 = df.columns
cols = intersection(lst1, lst2)
append_str = '`'
cols = [append_str + col for col in cols]
cols = [col + append_str for col in cols]
df_2 = df_prepSchema.select(cols)
df = spark.read.json(['/Users/.../input/json/thisistheinputfile.json'])
df.printSchema()
结果如下:
root
|-- _metadata: struct (nullable = true)
| |-- bundled: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- bundledIds: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- failedInitializations: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- unbundled: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- anonymousId: string (nullable = true)
|-- channel: string (nullable = true)
|-- context: struct (nullable = true)
| |-- campaign: struct (nullable = true)
| | |-- content: string (nullable = true)
| | |-- medium: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- source: string (nullable = true)
| | |-- term: string (nullable = true)
| | |-- utm_campaign: string (nullable = true)
| | |-- utm_medium: string (nullable = true)
| | |-- utm_term: string (nullable = true)
| |-- ip: string (nullable = true)
但是(稍后)在某些情况下,输入文件不包含上面出现的一些内容,因为例如,可能活动信息不可用:
root
|-- _metadata: struct (nullable = true)
| |-- bundled: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- bundledIds: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- failedInitializations: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- unbundled: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- anonymousId: string (nullable = true)
|-- channel: string (nullable = true)
|-- context: struct (nullable = true)
| |-- ip: string (nullable = true)
我希望能够自动 select 某些列,但我不希望脚本在内容不可用时崩溃。请注意,要 selected 的列数比下面的示例多得多:
df_2 = df\
.select(expr("context.campaign.source").alias("campaign_source"),
expr("context.campaign.utm_campaign").alias("utm_campaign"),
'anonymousId')
一种情况可能是 anonymousId、ip 和 context.campaign.source存在,但不存在 context.campaign.utm_campaign 和所有可能的组合(可以有很多列)。
我尝试列出我想要查找的部分并检查它们是否存在,然后可以将该列表用作数据框的输入 selection。但是我发现这很困难,因为我有一个嵌套的数据框:
lst = ['anonymousId',
'source',
'utm_campaign',
'ip']
col_exists = []
for col in lst:
if df_seglog_prod.schema.simpleString().find(col) > 0:
col_exists.append(col)
else:
print('Column', col, 'does not exist')
df_2 = df.select(col_exsists) #does ofc not work...
关于如何使用这种嵌套数据框有什么技巧吗?
先谢谢了!!
以下步骤帮助解决了我的问题:
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = prefix + '.' + field.name if prefix else field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
def intersection(lst1, lst2):
# Use of hybrid method
temp = set(lst2)
lst3 = [value for value in lst1 if value in temp]
return lst3
fieldsPathName = flatten(df.schema)
df_prepSchema = df.select(fieldsPathName).toDF(*fieldsPathName)
lst1 = ['context.campaign.source',
'context.campaign.utm_campaign',
'timestamp',
'anonymousId']
lst2 = df.columns
cols = intersection(lst1, lst2)
append_str = '`'
cols = [append_str + col for col in cols]
cols = [col + append_str for col in cols]
df_2 = df_prepSchema.select(cols)