如何使用 python 向复杂的 Spark 结构添加列?
How to add a column to complex Spark structures with python?
我需要使用 Spark Streaming 将某些元素的加密版本添加到复杂的嵌套结构中。进来的 JSON 元素可以有不同的模式,因此我正在寻找一个动态的解决方案,我不需要对 Spark 模式进行硬编码。
例如,这是我可以获得的 JSONs 之一:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt"
}
]
}
}
我想要完成的是获得以下内容:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt",
"thisisthevaluetoenrcypt_masked": "BNHFBYHTYBFDBY"
}
]
}
}
就像我提到的,模式可能不同,所以我也可能得到这样的结果:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"onemorestruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt"
}
}
],
"thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"
}
}
我想得到这样的东西:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"onemorestruct":
{
"ID":1,
"thisisthevaluetoencrypt": "imthevaluetoencrypt",
"thisisthevaluetoencrypt_masked": "BNHFBYHTYBFDBY"
}
}
],
"thisisanothervaluetoencrypt": "imtheothervaluetoencrypt",
"thisisanothervaluetoencrypt_masked": "TYHRBVTRHTYJTJ"
}
}
我有一个python方法来加密值;但是,我无法动态更改结构。我认为这样的事情可能会有帮助,但不幸的是我没有 Scala 经验,我无法将它转换为 pyspark,并更改它以便它添加一个新字段而不是更改当前值
Change value of nested column in DataFrame
如有任何帮助,我们将不胜感激
编辑:
这是我用来加密数据的功能。我正在通过 UDF 进行操作,但如果需要可以更改它
def encrypt_string(s):
result = []
kms = boto3.client('kms', region_name = 'us-west-2')
response = kms.encrypt(
KeyId=key_id,
Plaintext= str(s)
)
return response['CiphertextBlob']
由于您的 JSON
可以有广泛不同的模式来将它们解析为 Spark 结构,您将需要一个模式,它是所有可能模式的联合,这很笨拙,因为您不知道JSON
看起来像。因此,我建议保持 JSON
字符串不变,并使用 UDF
将字符串解析为 dict
并更新值并将结果作为 JSON
字符串返回。
from typing import Dict, Any, List
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import json
data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}}', ),
('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}', ), ]
df = spark.createDataFrame(data, ("json_col", ))
def encrypt(s: str) -> str:
return f"encrypted_{s}"
def walk_dict(struct: Dict[str, Any], fields_to_encrypt: List[str]):
keys_copy = set(struct.keys())
for k in keys_copy:
if k in fields_to_encrypt and isinstance(struct[k], str):
struct[f"{k}_masked"] = encrypt(struct[k])
else:
walk_fields(struct[k], fields_to_encrypt)
def walk_fields(field: Any, fields_to_encrypt: List[str]):
if isinstance(field, dict):
walk_dict(field, fields_to_encrypt)
if isinstance(field, list):
[walk_fields(e, fields_to_encrypt) for e in field]
def encrypt_fields(json_string: str) -> str:
fields_to_encrypt = ["thisisthevaluetoenrcypt", "thisisanothervaluetoenrcypt"]
as_json = json.loads(json_string)
walk_fields(as_json, fields_to_encrypt)
return json.dumps(as_json)
field_encryption_udf = F.udf(encrypt_fields, StringType())
df.withColumn("encrypted", field_encryption_udf(F.col("json_col"))).show(truncate=False)
输出
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|json_col |encrypted |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}} |{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}]}} |
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt", "thisisanothervaluetoenrcypt_masked": "encrypted_imtheothervaluetoencrypt"}}|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
我需要使用 Spark Streaming 将某些元素的加密版本添加到复杂的嵌套结构中。进来的 JSON 元素可以有不同的模式,因此我正在寻找一个动态的解决方案,我不需要对 Spark 模式进行硬编码。
例如,这是我可以获得的 JSONs 之一:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt"
}
]
}
}
我想要完成的是获得以下内容:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt",
"thisisthevaluetoenrcypt_masked": "BNHFBYHTYBFDBY"
}
]
}
}
就像我提到的,模式可能不同,所以我也可能得到这样的结果:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"onemorestruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt"
}
}
],
"thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"
}
}
我想得到这样的东西:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"onemorestruct":
{
"ID":1,
"thisisthevaluetoencrypt": "imthevaluetoencrypt",
"thisisthevaluetoencrypt_masked": "BNHFBYHTYBFDBY"
}
}
],
"thisisanothervaluetoencrypt": "imtheothervaluetoencrypt",
"thisisanothervaluetoencrypt_masked": "TYHRBVTRHTYJTJ"
}
}
我有一个python方法来加密值;但是,我无法动态更改结构。我认为这样的事情可能会有帮助,但不幸的是我没有 Scala 经验,我无法将它转换为 pyspark,并更改它以便它添加一个新字段而不是更改当前值
Change value of nested column in DataFrame
如有任何帮助,我们将不胜感激
编辑: 这是我用来加密数据的功能。我正在通过 UDF 进行操作,但如果需要可以更改它
def encrypt_string(s):
result = []
kms = boto3.client('kms', region_name = 'us-west-2')
response = kms.encrypt(
KeyId=key_id,
Plaintext= str(s)
)
return response['CiphertextBlob']
由于您的 JSON
可以有广泛不同的模式来将它们解析为 Spark 结构,您将需要一个模式,它是所有可能模式的联合,这很笨拙,因为您不知道JSON
看起来像。因此,我建议保持 JSON
字符串不变,并使用 UDF
将字符串解析为 dict
并更新值并将结果作为 JSON
字符串返回。
from typing import Dict, Any, List
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import json
data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}}', ),
('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}', ), ]
df = spark.createDataFrame(data, ("json_col", ))
def encrypt(s: str) -> str:
return f"encrypted_{s}"
def walk_dict(struct: Dict[str, Any], fields_to_encrypt: List[str]):
keys_copy = set(struct.keys())
for k in keys_copy:
if k in fields_to_encrypt and isinstance(struct[k], str):
struct[f"{k}_masked"] = encrypt(struct[k])
else:
walk_fields(struct[k], fields_to_encrypt)
def walk_fields(field: Any, fields_to_encrypt: List[str]):
if isinstance(field, dict):
walk_dict(field, fields_to_encrypt)
if isinstance(field, list):
[walk_fields(e, fields_to_encrypt) for e in field]
def encrypt_fields(json_string: str) -> str:
fields_to_encrypt = ["thisisthevaluetoenrcypt", "thisisanothervaluetoenrcypt"]
as_json = json.loads(json_string)
walk_fields(as_json, fields_to_encrypt)
return json.dumps(as_json)
field_encryption_udf = F.udf(encrypt_fields, StringType())
df.withColumn("encrypted", field_encryption_udf(F.col("json_col"))).show(truncate=False)
输出
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|json_col |encrypted |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}} |{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}]}} |
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt", "thisisanothervaluetoenrcypt_masked": "encrypted_imtheothervaluetoencrypt"}}|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+