如何将输入字符串转换为 pyspark 中列的每一行的字典

How can I convert a input string into dictionary for each rows of a column in pyspark

我有一个数据框的列值,我在其中接收如下所示的字符串输入,其中 startIndex 是每个字符开头的索引,结束索引是该字符在字符串中出现的结尾,标志是角色本身。

    +---+------------------+
    | id|    Values        |
    +---+------------------+
    |01 |  AABBBAA         |
    |02 |  SSSAAAA         |
    +---+------------------+

现在我想将字符串转换为每行的字典,如下所示:

    +---+--------------------+
    | id|    Values          |
    +---+--------------------+
    |01 |  [{"startIndex":0, |
    |   |    "endIndex" : 1, | 
    |   |    "flag" : A },   |
    |   |   {"startIndex":2, |
    |   |    "endIndex" : 4, |
    |   |    "flag" : B },   |
    |   |   {"startIndex":5, |
    |   |    "endIndex" : 6, |
    |   |    "flag" : A }]   |
    |02 |  [{"startIndex":0, |
    |   |    "endIndex" : 2, |
    |   |    "flag" : S },   |
    |   |   {"startIndex":3, |
    |   |    "endIndex" : 6, |
    |   |    "flag" : A }]   |
    +---+--------------------+-

我有构建字典的伪代码,但不确定如何应用它 在不使用循环的情况下一次性处理所有行。还有这样的问题 方法只是最后一个框架字典在所有行中被覆盖


        import re
        x = "aaabbbbccaa"
        xs = re.findall(r"((.)*)", x)
        print(xs)
        start = 0
        output = '' 
        for item in xs:
            end = start + (len(item[0])-1)
            startIndex = start
            endIndex = end
            qualityFlag = item[1]
            print(startIndex, endIndex, qualityFlag)
            start = end+

使用udf() to wrap up the code logic and to_json()将结构数组转换为字符串:

from pyspark.sql.functions import udf, to_json
import re

df = spark.createDataFrame([
      ('01', 'AABBBAA')
    , ('02', 'SSSAAAA')
  ] , ['id', 'Values']
)

# argument `x` is a StringType() over the udf function
# return `row` as a list of dicts
@udf('array<struct<startIndex:long,endIndex:long,flag:string>>')
def set_fields(x):
    row = []
    for m in re.finditer(r'(.)*', x):
        row.append({
            'startIndex': m.start()
          , 'endIndex': m.end()-1
          , 'flag': m.group(1)
        })
    return row

df.select('id', to_json(set_fields('Values')).alias('Values')).show(truncate=False)
+---+----------------------------------------------------------------------------------------------------------------------------+
|id |Values                                                                                                                      |
+---+----------------------------------------------------------------------------------------------------------------------------+
|01 |[{"startIndex":0,"endIndex":1,"flag":"A"},{"startIndex":2,"endIndex":4,"flag":"B"},{"startIndex":5,"endIndex":6,"flag":"A"}]|
|02 |[{"startIndex":0,"endIndex":2,"flag":"S"},{"startIndex":3,"endIndex":6,"flag":"A"}]                                         |
+---+----------------------------------------------------------------------------------------------------------------------------+