在 pyspark 中处理 JSON 架构更改
Handling JSON schema changes in pyspark
我正在读取 JSON 来自 s3 存储桶的日志数据。 LogDNA 在更改其日志输出模式方面臭名昭著,最近又这样做了。
这对我来说是一种情况:
日期为 <=2019-07-29 的文件包含架构 (简化)
{_source.message: "Hello World"}
和约会文件 =>2019-07-30 具有架构
{message: "Hello World"}
Spark 依次推断出以下模式(再次简化)
root
|-- _source: struct (nullable = true)
| |-- message: string (nullable = true)
|-- message: string (nullable = true)
我想将这两个模式映射到一个 Dataframe 列中。我应该怎么做?
由于此模式是自动发现的,映射函数将是最佳方法,但即使手动映射对我来说也可以 ATM
我已经设法解决了它,它有点复杂所以我会分享一些完整的解决方案,希望人们会觉得它有用。
解决方案本质上是代码生成,SQL 风格。
正在加载数据
logs_json = spark.read.json("/mnt/seedx-ops-logs-prod/*/*/*.json.gz")
logs_json.registerTempTable("logs_json")
定义辅助函数
#
def _append_case_when(field, _end_counter):
_sql = ''
if _end_counter > 0:
_sql += ' ELSE '
_sql += f""" CASE WHEN {field} is not null THEN {field} """
return _sql
正在生成 SQL
root = logs_json.schema.jsonValue()
# Complete Schema
fields_schema = root['fields']
# Old schema is netsted under the {"_source": X} subtree.
_source = next(x for x in fields_schema if x['name'] == '_source')
old_fields_schema = _source['type']['fields']
old_schema = {s['name']: s for s in old_fields_schema}
# New schema
new_schema = {s['name']: s for s in fields_schema}
del new_schema['_source']
keys_new_schema_meta = {field['name'] for field in new_schema['_meta']['type']['fields']}
del new_schema['_meta']
keys_new_schema = set(new_schema.keys())
keys_old_schema_meta = {field['name'] for field in old_schema['_meta']['type']['fields']}
del old_schema['_meta']
keys_old_schema = set(old_schema.keys())
schema_keys = keys_new_schema | keys_new_schema_meta | keys_old_schema | keys_old_schema_meta
STRUCT_SQL = []
for field in schema_keys:
in_new = field in keys_new_schema
in_new_meta = field in keys_new_schema_meta
in_old = field in keys_old_schema
in_old_meta = field in keys_old_schema_meta
_sql = ''
_end_counter = 0
if in_new:
_sql += _append_case_when(field, _end_counter)
_end_counter += 1
if in_new_meta:
_sql += _append_case_when(f"_meta.{field}", _end_counter)
_end_counter += 1
if in_old:
_sql += _append_case_when(f"_source.{field}", _end_counter)
_end_counter += 1
if in_old_meta:
_sql += _append_case_when(f"_source._meta.{field}", _end_counter)
_end_counter += 1
_sql += ' ELSE null '
for x in range(_end_counter): _sql += ' END '
_sql += f""" as {field}"""
STRUCT_SQL.append(_sql)
STRUCT_FORMATTED = ',\n'.join(STRUCT_SQL)
SQL = f"""struct({STRUCT_FORMATTED}) AS FACET"""
logs = spark.sql(f'SELECT FACET.* FROM (select {SQL} from logs_json)')
查询示例
logs.createOrReplaceTempView('logs')
%sql
select * from logs
我正在读取 JSON 来自 s3 存储桶的日志数据。 LogDNA 在更改其日志输出模式方面臭名昭著,最近又这样做了。
这对我来说是一种情况:
日期为 <=2019-07-29 的文件包含架构 (简化)
{_source.message: "Hello World"}
和约会文件 =>2019-07-30 具有架构
{message: "Hello World"}
Spark 依次推断出以下模式(再次简化)
root
|-- _source: struct (nullable = true)
| |-- message: string (nullable = true)
|-- message: string (nullable = true)
我想将这两个模式映射到一个 Dataframe 列中。我应该怎么做?
由于此模式是自动发现的,映射函数将是最佳方法,但即使手动映射对我来说也可以 ATM
我已经设法解决了它,它有点复杂所以我会分享一些完整的解决方案,希望人们会觉得它有用。
解决方案本质上是代码生成,SQL 风格。
正在加载数据
logs_json = spark.read.json("/mnt/seedx-ops-logs-prod/*/*/*.json.gz")
logs_json.registerTempTable("logs_json")
定义辅助函数
#
def _append_case_when(field, _end_counter):
_sql = ''
if _end_counter > 0:
_sql += ' ELSE '
_sql += f""" CASE WHEN {field} is not null THEN {field} """
return _sql
正在生成 SQL
root = logs_json.schema.jsonValue()
# Complete Schema
fields_schema = root['fields']
# Old schema is netsted under the {"_source": X} subtree.
_source = next(x for x in fields_schema if x['name'] == '_source')
old_fields_schema = _source['type']['fields']
old_schema = {s['name']: s for s in old_fields_schema}
# New schema
new_schema = {s['name']: s for s in fields_schema}
del new_schema['_source']
keys_new_schema_meta = {field['name'] for field in new_schema['_meta']['type']['fields']}
del new_schema['_meta']
keys_new_schema = set(new_schema.keys())
keys_old_schema_meta = {field['name'] for field in old_schema['_meta']['type']['fields']}
del old_schema['_meta']
keys_old_schema = set(old_schema.keys())
schema_keys = keys_new_schema | keys_new_schema_meta | keys_old_schema | keys_old_schema_meta
STRUCT_SQL = []
for field in schema_keys:
in_new = field in keys_new_schema
in_new_meta = field in keys_new_schema_meta
in_old = field in keys_old_schema
in_old_meta = field in keys_old_schema_meta
_sql = ''
_end_counter = 0
if in_new:
_sql += _append_case_when(field, _end_counter)
_end_counter += 1
if in_new_meta:
_sql += _append_case_when(f"_meta.{field}", _end_counter)
_end_counter += 1
if in_old:
_sql += _append_case_when(f"_source.{field}", _end_counter)
_end_counter += 1
if in_old_meta:
_sql += _append_case_when(f"_source._meta.{field}", _end_counter)
_end_counter += 1
_sql += ' ELSE null '
for x in range(_end_counter): _sql += ' END '
_sql += f""" as {field}"""
STRUCT_SQL.append(_sql)
STRUCT_FORMATTED = ',\n'.join(STRUCT_SQL)
SQL = f"""struct({STRUCT_FORMATTED}) AS FACET"""
logs = spark.sql(f'SELECT FACET.* FROM (select {SQL} from logs_json)')
查询示例
logs.createOrReplaceTempView('logs')
%sql
select * from logs