如何根据 json 方案转换字典,Python3

How to convert a dictionary according to a json scheme, Python3

我有一个json方案,在Python3.

中指定字典的格式
INPUT_SCHEME = {
    "type": "object",
    "properties": {
        "a1": {
            "type": "object",
            "properties": {
                "a1_1": {"type": ["string", "null"]},
                "a1_2": {"type": ["number", "null"]},
            },
            "additionalProperties": False,
            "minProperties": 2,
        },
        "a2": {
            "type": "array",
            "items": {"type": ["number", "null"]},
        },
        "a3": {
            "type": ["number", "null"],
        },
        "a4": {
            "type": "object",
            "properties": {
                "a4_1": {"type": ["string", "null"]},
                "a4_2": {
                    "type": "object",
                    "properties": {
                        "a4_2_1": {"type": ["string", "null"]},
                        "a4_2_2": {"type": ["number", "null"]},
                    },
                    "additionalProperties": False,
                    "minProperties": 2,
                },
            },
            "additionalProperties": False,
            "minProperties": 2,
        },
        "a5": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "a5_1": {"type": ["string", "null"]},
                    "a5_2": {"type": ["number", "null"]},
                },
                "additionalProperties": False,
                "minProperties": 2,
            },
        },
    },
    "additionalProperties": False,
    "minProperties": 5,
}

我想编写一个函数,可以将任意输入字典转换为 INPUT_SCHEME 定义的格式。

规则是:

  1. 如果输入字典缺少一个字段,则在输出字典中用None或空列表填充字段。
  2. 如果输入字典有一个未在 INPUT_SCHEME 中定义的键,则在输出字典中将其删除。

例如,假设我有 a_input,其中只有 'a1' 是正确的。 'a2', 'a3', and 'a4' 不见了。 'a5' 中的每个元素都缺少一个 属性。 'a6' 是一个未定义的字段。 我要编写的函数应该将 a_input 转换为 a_output。你可以使用jsonschema.validate来检查。

a_input = {
    'a1': {'a1_1': 'apple', 'a1_2': 20.5},
    'a5': [{'a5_1': 'pear'}, {'a5_2': 18.5}],
    'a6': [1, 2, 3, 4],
}

a_output = {
    'a1': {'a1_1': 'apple', 'a1_2': 20.5},
    'a2': [],
    'a3': None,
    'a4': {
        'a4_1': None,
        'a4_2': {
            'a4_2_1': None,
            'a4_2_2': None,
        }
    },
    'a5': [
        {
            'a5_1': 'pear',
            'a5_2': None,
        },
        {
            'a5_1': None,
            'a5_2': 18.5,
        }
    ]
}

jsonschema.validate(a_output, schema=INPUT_SCHEME)

我试着写了这个函数,但是没写出来。主要是if-else检查加嵌套结构太多,迷路了。你能帮帮我吗?

谢谢。

def my_func(a_from):
    a_to = dict()
    for key_1 in INPUT_SCHEME['properties'].keys():
        if key_1 not in a_from:
            a_to[key_1] = None  # This is incorrect, since the structure of a_to[key_1] depends on INPUT_SCHEME.
            continue

        layer_1 = INPUT_SCHEME['properties'][key_1]
        if 'properties' in layer_1:  # like a1, a4
            for key_2 in layer_1['properties'].keys():
                layer_2 = layer_1['properties'][key_2]
                ...

                # but it can be a nest of layers. Like a4, there are 3 layers. In real case, it can have more layers.

        elif 'items' in layer_1:
            if 'properties' in layer_1['items']:  # like a5
                ...
            else:  # like a2
                ...
        else:  # like 3
            ...
    return a_to

递归算法适合这个。

我将其分为 2 个不同的功能,因为从模式中删除未定义的属性和填充 non-existent 是两个不同的任务。如果您愿意,可以将它们合并为一个。

为了填充不存在的属性,我只是创建数组、对象和 Nones,然后向内递归。

为了删除未定义的属性,我比较架构键并删除不匹配的键,再次向内递归。

您可能会在代码中看到注释和类型检查:

def fill_nonexistent_properties(input_dictionary, schema):
    """
    Fill missing properties in input_dictionary according to the schema.
    """
    properties = schema['properties']
    missing_properties = set(properties).difference(input_dictionary)

    # Fill all missing properties.
    for key in missing_properties:
        value = properties[key]
        if value['type'] == 'array':
            input_dictionary[key] = []
        elif value['type'] == 'object':
            input_dictionary[key] = {}
        else:
            input_dictionary[key] = None
    
    # Recurse inside all properties.
    for key, value in properties.items():
        
        # If it's an array of objects, recurse inside each item.
        if value['type'] == 'array' and value['items']['type'] == 'object':
            object_list = input_dictionary[key]

            if not isinstance(object_list, list):
                raise ValueError(
                    f"Invalid JSON object: {key} is not a list.")

            for item in object_list:
                if not isinstance(item, dict):
                    raise ValueError(
                        f"Invalid JSON object: {key} is not a list of objects.")
                fill_nonexistent_properties(item, value['items'])

        # If it's an object, recurse inside it.
        elif value['type'] == 'object':
            obj = input_dictionary[key]
            if not isinstance(obj, dict):
                raise ValueError(
                    f"Invalid JSON object: {key} is not a dictionary.")
            fill_nonexistent_properties(obj, value)

def remove_undefined_properties(input_dictionary, schema):
    """
    Remove properties in input_dictionary that are not defined in the schema.
    """
    properties = schema['properties']
    undefined_properties = set(input_dictionary).difference(properties)

    # Remove all undefined properties.
    for key in undefined_properties:
        del input_dictionary[key]
    
    # Recurse inside all existing sproperties.
    for key, value in input_dictionary.items():
        property_shcema = properties[key]

        # If it's an array of objects, recurse inside each item.
        if isinstance(value, list):
            if not property_shcema['type'] == 'array':
                raise ValueError(
                    f"Invalid JSON object: {key} is not a list.")

            # We're only dealing with objects inside arrays.
            if not property_shcema['items']['type'] == 'object':
                continue
            
            for item in value:
                # Make sure each item is an object.
                if not isinstance(item, dict):
                    raise ValueError(
                        f"Invalid JSON object: {key} is not a list of objects.")
                remove_undefined_properties(item, property_shcema['items'])
        
        # If it's an object, recurse inside it.
        elif isinstance(value, dict):
            # Make sure the object is supposed to be an object.
            if not property_shcema['type'] == 'object':
                raise ValueError(
                    f"Invalid JSON object: {key} is not an object.")

            remove_undefined_properties(value, property_shcema)


import pprint
pprint.pprint(a_input)
fill_nonexistent_properties(a_input, INPUT_SCHEME)
remove_undefined_properties(a_input, INPUT_SCHEME)
print("-"*10, "OUTPUT", "-"*10)
pprint.pprint(a_input)

输出:

{'a1': {'a1_1': 'apple', 'a1_2': 20.5},
 'a5': [{'a5_1': 'pear'}, {'a5_2': 18.5}],
 'a6': [1, 2, 3, 4]}
---------- OUTPUT ----------
{'a1': {'a1_1': 'apple', 'a1_2': 20.5},
 'a2': [],
 'a3': None,
 'a4': {'a4_1': None, 'a4_2': {'a4_2_1': None, 'a4_2_2': None}},
 'a5': [{'a5_1': 'pear', 'a5_2': None}, {'a5_1': None, 'a5_2': 18.5}]}