将基于 json 的日志转换为列格式,即每列一个文件

converting json based log into column format, i.e., one file per column

日志文件示例:

{"timestamp": "2022-01-14T00:12:21.000", "Field1": 10, "Field_Doc": {"f1": 0}}
{"timestamp": "2022-01-18T00:15:51.000", "Field_Doc": {"f1": 0, "f2": 1.7, "f3": 2}}

它将生成5个文件:

  1. timestamp.column
  2. Field1.column
  3. Field_Doc.f1.column
  4. Field_Doc.f2.column
  5. Field_Doc.f3.column

栏目文件格式如下:

timestamp.column的示例内容:

2022-01-14T00:12:21.000
2022-01-18T00:15:51.000

注意:日志中的字段将是动态的,不要假设这些是预期的属性

谁能告诉我怎么做,

日志文件的大小约为4GB到48GB

如果每个 JSON 都在一行中,那么您可以 open() 文件并使用 for line in file: 逐行读取 - 接下来您可以使用模块 [= 将行转换为字典16=] 并处理它。

您可以使用 for key, value in data: 分别处理每个项目。您可以使用 key 创建文件名 f"{key}.column" 并以追加模式打开它 "a" 并在该文件中写入 str(value) + "\n"

因为你有嵌套的字典所以你需要 isinstance(value, dict) 检查你是否没有 {"f1": 0, "f2": 1.7, "f3": 2} 并重复这个字典的代码 - 这可能需要使用递归。


最少的工作代码。

我只用io模拟内存中的文件,你应该用open(filename)

file_data = '''{"timestamp": "2022-01-14T00:12:21.000", "Field1": 10, "Field_Doc": {"f1": 0}}
{"timestamp": "2022-01-18T00:15:51.000", "Field_Doc": {"f1": 0, "f2": 1.7, "f3": 2}}'''

import json

# --- functions ---

def process_dict(data, prefix=""):
    
    for key, value in data.items():
        
        if prefix:
            key = prefix + "." + key

        if isinstance(value, dict):
            process_dict(value, key)
        else:
            with open(key + '.column', "a") as f:
                f.write(str(value) + "\n")

# --- main ---

#file_obj = open("filename")

import io
file_obj = io.StringIO(file_data)  # emulate file in memory

for line in file_obj:
    data = json.loads(line)
    print(data)
    process_dict(data)
    #process_dict(data, "some prefix for all files")

编辑:

更通用的版本 - 它得到 function 作为第三个参数,因此它可以与不同的函数一起使用

file_data = '''{"timestamp": "2022-01-14T00:12:21.000", "Field1": 10, "Field_Doc": {"f1": 0}}
{"timestamp": "2022-01-18T00:15:51.000", "Field_Doc": {"f1": 0, "f2": 1.7, "f3": 2}}'''

import json

# --- functions ---

def process_dict(data, func, prefix=""):
    
    for key, value in data.items():
        
        if prefix:
           key = prefix + "." + key
        
        if isinstance(value, dict):
            process_dict(value, func, key)
        else:
            func(key, value)

def write_func(key, value):
    with open(key + '.column', "a") as f:
        f.write(str(value) + "\n")

# --- main ---

#file_obj = open("filename")

import io
file_obj = io.StringIO(file_data)  # emulate file in memory

for line in file_obj:
    data = json.loads(line)
    print(data)
    process_dict(data, write_func)
    #process_dict(data, write_func, "some prefix for all files")

另一个使它更通用的想法是创建一个函数来平化 dict 并创建

{'timestamp': '2022-01-14T00:12:21.000', 'Field1': 10, 'Field_Doc.f1': 0}
{'timestamp': '2022-01-18T00:15:51.000', 'Field_Doc.f1': 0, 'Field_Doc.f2': 1.7, 'Field_Doc.f3': 2}

后来用循环写元素


file_data = '''{"timestamp": "2022-01-14T00:12:21.000", "Field1": 10, "Field_Doc": {"f1": 0}}
{"timestamp": "2022-01-18T00:15:51.000", "Field_Doc": {"f1": 0, "f2": 1.7, "f3": 2}}'''

import json

# --- functions ---

def flatten_dict(data, prefix=""):
    
    result = {}

    for key, value in data.items():
        
        if prefix:
           key = prefix + "." + key
        
        if isinstance(value, dict):
            result.update( process_dict(value, key) )
        else:
            result[key] = value
            #result.update( {key: value} )
            
    return result

# --- main ---

#file_obj = open("filename")

import io
file_obj = io.StringIO(file_data)  # emulate file in memory

for line in file_obj:
    data = json.loads(line)
    print('before:', data)
    
    data = flatten_dict(data)
    #data = flatten_dict(data, "some prefix for all items")
    print('after :', data)
    
    print('---')
    
    for key, value in data.items():
        with open(key + '.column', "a") as f:
            f.write(str(value) + "\n")