在 pyspark 中处理 JSON 架构更改

Handling JSON schema changes in pyspark

我正在读取 JSON 来自 s3 存储桶的日志数据。 LogDNA 在更改其日志输出模式方面臭名昭著,最近又这样做了。

这对我来说是一种情况:

日期为 <=2019-07-29 的文件包含架构 (简化)

{_source.message: "Hello World"}

和约会文件 =>2019-07-30 具有架构

{message: "Hello World"}

Spark 依次推断出以下模式(再次简化)

root
 |-- _source: struct (nullable = true)
 |    |-- message: string (nullable = true)
 |-- message: string (nullable = true)

我想将这两个模式映射到一个 Dataframe 列中。我应该怎么做?

由于此模式是自动发现的,映射函数将是最佳方法,但即使手动映射对我来说也可以 ATM

我已经设法解决了它,它有点复杂所以我会分享一些完整的解决方案,希望人们会觉得它有用。

解决方案本质上是代码生成,SQL 风格。

正在加载数据

logs_json = spark.read.json("/mnt/seedx-ops-logs-prod/*/*/*.json.gz")
logs_json.registerTempTable("logs_json")

定义辅助函数

# 

def _append_case_when(field, _end_counter):
  _sql = ''

  if _end_counter > 0:
    _sql += ' ELSE '

  _sql += f""" CASE WHEN {field} is not null THEN {field} """

  return _sql

正在生成 SQL

root = logs_json.schema.jsonValue()

# Complete Schema
fields_schema = root['fields']

# Old schema is netsted under the {"_source": X} subtree.
_source = next(x for x in fields_schema if x['name'] == '_source')
old_fields_schema = _source['type']['fields']
old_schema = {s['name']: s for s in old_fields_schema}

# New schema
new_schema = {s['name']: s for s in fields_schema}
del new_schema['_source']

keys_new_schema_meta = {field['name'] for field in new_schema['_meta']['type']['fields']}
del new_schema['_meta']
keys_new_schema = set(new_schema.keys())


keys_old_schema_meta = {field['name'] for field in old_schema['_meta']['type']['fields']}
del old_schema['_meta']
keys_old_schema = set(old_schema.keys())

schema_keys = keys_new_schema | keys_new_schema_meta | keys_old_schema | keys_old_schema_meta

STRUCT_SQL = []

for field in schema_keys:
  in_new = field in keys_new_schema
  in_new_meta = field in keys_new_schema_meta
  in_old = field in keys_old_schema
  in_old_meta = field in keys_old_schema_meta

  _sql = ''

  _end_counter = 0

  if in_new:
    _sql += _append_case_when(field, _end_counter)
    _end_counter += 1

  if in_new_meta:
    _sql += _append_case_when(f"_meta.{field}", _end_counter)
    _end_counter += 1

  if in_old:
    _sql += _append_case_when(f"_source.{field}", _end_counter)
    _end_counter += 1

  if in_old_meta:
    _sql += _append_case_when(f"_source._meta.{field}", _end_counter)
    _end_counter += 1

  _sql += ' ELSE null '

  for x in range(_end_counter): _sql += ' END '

  _sql += f""" as {field}"""

  STRUCT_SQL.append(_sql)

STRUCT_FORMATTED = ',\n'.join(STRUCT_SQL)
SQL = f"""struct({STRUCT_FORMATTED}) AS FACET"""

logs = spark.sql(f'SELECT FACET.* FROM (select {SQL} from logs_json)')

查询示例

logs.createOrReplaceTempView('logs')

%sql
select * from logs