如何使用 python 向复杂的 Spark 结构添加列?

How to add a column to complex Spark structures with python?

我需要使用 Spark Streaming 将某些元素的加密版本添加到复杂的嵌套结构中。进来的 JSON 元素可以有不同的模式,因此我正在寻找一个动态的解决方案,我不需要对 Spark 模式进行硬编码。

例如,这是我可以获得的 JSONs 之一:

{
   "hello":"world",
   "thisisastruct":
   {
       "thisisanarray":
        [
            "thisisanotherstruct":
            {
                 "ID":1,
                 "thisisthevaluetoenrcypt": "imthevaluetoencrypt"
            }
        ]
   }
}

我想要完成的是获得以下内容:

{
   "hello":"world",
   "thisisastruct":
   {
       "thisisanarray":
        [
            "thisisanotherstruct":
            {
                 "ID":1,
                 "thisisthevaluetoenrcypt": "imthevaluetoencrypt",
                 "thisisthevaluetoenrcypt_masked": "BNHFBYHTYBFDBY"
            }
        ]
   }
}

就像我提到的,模式可能不同,所以我也可能得到这样的结果:

{
   "hello":"world",
   "thisisastruct":
   {
       "thisisanarray":
        [
            "thisisanotherstruct":
            {
                "onemorestruct":
                {
                     "ID":1,
                     "thisisthevaluetoenrcypt": "imthevaluetoencrypt"
                }
            }
        ],
        "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"
   }
}

我想得到这样的东西:

{
   "hello":"world",
   "thisisastruct":
   {
       "thisisanarray":
        [
            "thisisanotherstruct":
            {
                "onemorestruct":
                {
                     "ID":1,
                     "thisisthevaluetoencrypt": "imthevaluetoencrypt",
                     "thisisthevaluetoencrypt_masked": "BNHFBYHTYBFDBY"
                }
            }
        ],
        "thisisanothervaluetoencrypt": "imtheothervaluetoencrypt",
        "thisisanothervaluetoencrypt_masked": "TYHRBVTRHTYJTJ"
   }
}

我有一个python方法来加密值;但是,我无法动态更改结构。我认为这样的事情可能会有帮助,但不幸的是我没有 Scala 经验,我无法将它转换为 pyspark,并更改它以便它添加一个新字段而不是更改当前值

Change value of nested column in DataFrame

如有任何帮助,我们将不胜感激

编辑: 这是我用来加密数据的功能。我正在通过 UDF 进行操作,但如果需要可以更改它

def encrypt_string(s):
    result = []
    kms = boto3.client('kms', region_name = 'us-west-2')
    response = kms.encrypt(
        KeyId=key_id,
        Plaintext= str(s)
    )
    return response['CiphertextBlob']

由于您的 JSON 可以有广泛不同的模式来将它们解析为 Spark 结构,您将需要一个模式,它是所有可能模式的联合,这很笨拙,因为您不知道JSON 看起来像。因此,我建议保持 JSON 字符串不变,并使用 UDF 将字符串解析为 dict 并更新值并将结果作为 JSON 字符串返回。

from typing import Dict, Any, List
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import json

data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}}', ), 
        ('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}', ), ]

df = spark.createDataFrame(data, ("json_col", ))

def encrypt(s: str) -> str:
    return f"encrypted_{s}"

def walk_dict(struct: Dict[str, Any], fields_to_encrypt: List[str]):
    keys_copy = set(struct.keys())
    for k in keys_copy:
        if k in fields_to_encrypt and isinstance(struct[k], str):
            struct[f"{k}_masked"] = encrypt(struct[k])
        else:
            walk_fields(struct[k], fields_to_encrypt)

def walk_fields(field: Any, fields_to_encrypt: List[str]):
    if isinstance(field, dict):
        walk_dict(field, fields_to_encrypt)
    if isinstance(field, list):
        [walk_fields(e, fields_to_encrypt) for e in field]
        
def encrypt_fields(json_string: str) -> str:
    fields_to_encrypt = ["thisisthevaluetoenrcypt", "thisisanothervaluetoenrcypt"]
    as_json = json.loads(json_string)
    walk_fields(as_json, fields_to_encrypt)
    return json.dumps(as_json)

field_encryption_udf = F.udf(encrypt_fields, StringType())

df.withColumn("encrypted", field_encryption_udf(F.col("json_col"))).show(truncate=False)

输出

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|json_col                                                                                                                                                                                                                    |encrypted                                                                                                                                                                                                                                                                                                                                                                  |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}}                                                                              |{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}]}}                                                                                                                                                          |
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt", "thisisanothervaluetoenrcypt_masked": "encrypted_imtheothervaluetoencrypt"}}|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+