PySpark XML 到 JSON w/ 时间序列数据
PySpark XML to JSON w/ Time Series Data
我有将近 50 万 XML 个包含时间序列数据的文件,每个文件大约 2-3MB,每个文件包含大约 10k 行时间序列数据。这个想法是将每个唯一 ID 的 XML 文件转换为 JSON。但是,每个ID的时序数据需要拆分成行大小为10的批次,转换为JSON写入NoSQL数据库。最初,编写的代码是为每个 ID 迭代一个整体数据帧并增加行大小 10,然后将文档写入数据库。
def resample_idx(X,resample_rate):
for idx in range(0,len(X),resample_rate):
yield X.iloc[idx:idx+resample_rate,:]
# Batch Documents
for idx, df_batch in enumerate(resample_idx(df,10))
dict_ = {}
dict_['id'] = soup.find('id').contents[0]
dict_['data'] = [v for k,v in pd.DataFrame.to_dict(df_batch.T).items()]
JSON 文档的示例如下所示:
{'id':123456A,
'data': [{'A': 251.23,
'B': 130.56,
'dtim': Timestamp('2011-03-24 11:18:13.350000')
},
{
'A': 253.23,
'B': 140.56,
'dtim': Timestamp('2011-03-24 11:19:21.310000')
},
.........
]
},
{'id':123593X,
'data': [{'A': 641.13,
'B': 220.51,
'C': 10.45
'dtim': Timestamp('2011-03-26 12:11:13.350000')
},
{
'A': 153.25,
'B': 810.16,
'C': 12.5
'dtim': Timestamp('2011-03-26 12:11:13.310000')
},
.........
]
}
这适用于小样本,但很快意识到这在创建批次时无法扩展。因此,希望在 Spark 中复制它。使用 Spark 的经验有限,但这是我到目前为止所做的尝试:
首先获取所有ID的所有时间序列数据:
df = sqlContext.read.format("com.databricks.spark.xml").options(rowTag='log').load("dbfs:/mnt/timedata/")
XML 架构
|-- _id: string (nullable = true)
|-- collect_list(TimeData): array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- data: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- ColNames: string (nullable = true)
| | |-- Units: string (nullable = true)
SQL 查询获取 Spark DataFrame
d = df.select("_id","TimeData.data",'TimeData.ColNames')
当前 Spark 数据帧
+--------------------+--------------------+--------------------+
| id | data| ColNames|
+--------------------+--------------------+--------------------+
|123456A |[2011-03-24 11:18...|dTim,A,B |
|123456A |[2011-03-24 11:19...|dTim,A,B |
|123593X |[2011-03-26 12:11...|dTim,A,B,C |
|123593X |[2011-03-26 12:11...|dTim,A,B,C |
+--------------------+--------------------+--------------------+
预期的 Spark 数据帧
+--------------------+--------------------+----------+----------+
| id | dTime| A| B|
+--------------------+--------------------+----------+----------+
|123456A |2011-03-24 11:18... | 251.23| 130.56|
|123456A |2011-03-24 11:19... | 253.23| 140.56|
+--------------------+--------------------+----------+----------+
+--------------------+--------------------+----------+----------+----------+
| id | dTime| A| B| C|
+--------------------+--------------------+----------+----------+----------+
|123593X |2011-03-26 12:11... | 641.13| 220.51| 10.45|
|123593X |2011-03-26 12:11... | 153.25| 810.16| 12.5|
+--------------------+-------------------+---------- +----------+----------+
我在这里只显示了两个时间戳的数据,但是我怎么能把上面的 DataFrame 变成每第 n 行(每个 id)的批处理 JSON 文件,类似于使用 Pandas如上所示?最初的想法是执行 groupBy 并将 UDF 应用于每个 ID?输出看起来像上面的 JSON 结构。
XML结构:
<log>
<id>"ABC"</id>
<TimeData>
<colNames>dTim,colA,colB,colC,</colNames>
<data>2011-03-24T11:18:13.350Z,0.139,38.988,0,110.307</data>
<data>2011-03-24T11:18:43.897Z,0.138,39.017,0,110.307</data>
</TimeData>
</log>
请注意,每个 ID 没有固定数量的 coNames,可以在 5-30 之间,具体取决于为该 ID 收集的数据源。
好吧,根据信息,这可能是一个解决方案。不幸的是,我的 Python 有点生疏,但这里应该有所有 scala 函数的等价物
// Assume nth is based of dTim ordering
val windowSpec = Window
.partitionBy($"_id")
.orderBy($"dTim".desc)
val nthRow = 2 // define the nthItem to be fetched
df.select(
$"_id",
$"TimeData.data".getItem(0).getItem(0).cast(TimestampType).alias("dTim"),
$"TimeData.data".getItem(0).getItem(1).cast(DoubleType).alias("A"),
$"TimeData.data".getItem(0).getItem(2).cast(DoubleType).alias("B"),
$"TimeData.data".getItem(0).getItem(3).cast(DoubleType).alias("C")
).withColumn("n", row_number().over(windowSpec))
.filter(col("n") === nthRow)
.drop("n")
.show()
会输出类似
的内容
+-------+--------------------+------+------+-----+
| _id| dTim| A| B| C|
+-------+--------------------+------+------+-----+
|123456A|2011-03-24 11:18:...|251.23|130.56| null|
|123593X|2011-03-26 12:11:...|641.13|220.51|10.45|
+-------+--------------------+------+------+-----+
如果我知道更多,我会改进答案
更新
我喜欢这个谜题,所以如果我正确理解问题,这可能是一个解决方案:
我创建了 3 xml 个文件,每 2 个数据记录总共有 2 个不同的 ID
val df = spark
.sqlContext
.read
.format("com.databricks.spark.xml")
.option("rowTag", "log")
.load("src/main/resources/xml")
// Could be computationally heavy, maybe cache df first if possible, otherwise run it on a sample, otherwise hardcode possible colums
val colNames = df
.select(explode(split($"TimeData.colNames",",")).as("col"))
.distinct()
.filter($"col" =!= lit("dTim") && $"col" =!= "")
.collect()
.map(_.getString(0))
.toList
.sorted
// or list all possible columns
//val colNames = List("colA", "colB", "colC")
// Based on XML colNames and data are comma seprated strings that have to be split. Could be done using sql split function, but this UDF maps the columns to the correct field
def mapColsToData = udf((cols:String, data:Seq[String]) =>
if(cols == null || data == null) Seq.empty[Map[String, String]]
else {
data.map(str => (cols.split(",") zip str.split(",")).toMap)
}
)
// The result of this action is 1 record for each datapoint for all XML's. Each data record is key->value map of colName->data
val denorm = df.select($"id", explode(mapColsToData($"TimeData.colNames", $"TimeData.data")).as("data"))
denorm.show(false)
输出:
+-------+-------------------------------------------------------------------------------+
|id |data |
+-------+-------------------------------------------------------------------------------+
|123456A|Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0)|
|123593X|Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988) |
|123593X|Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017) |
|123456A|Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0)|
+-------+-------------------------------------------------------------------------------+
// now create column for each map value, based on predef / found columnNames
val columized = denorm.select(
$"id",
$"data.dTim".cast(TimestampType).alias("dTim"),
$"data"
)
columized.show()
输出:
+-------+--------------------+--------------------+
| id| dTim| data|
+-------+--------------------+--------------------+
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
+-------+--------------------+--------------------+
// create window over which to resample
val windowSpec = Window
.partitionBy($"id")
.orderBy($"dTim".desc)
val resampleRate = 2
// add batchId based on resample rate. Group by batch and
val batched = columized
.withColumn("batchId", floor((row_number().over(windowSpec) - lit(1)) / lit(resampleRate)))
.groupBy($"id", $"batchId")
.agg(collect_list($"data").as("data"))
.drop("batchId")
batched.show(false)
输出:
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |data |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123593X|[Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017), Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)] |
|123456A|[Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0), Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)]|
|123456A|[Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0), Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)]|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// Store as 1 huge json file (drop reapatrition if you can handle multiple json, better for master as well)
batched.repartition(1).write.mode(SaveMode.Overwrite).json("/tmp/xml")
输出json:
{"id":"123593X","data":[{"dTim":"2011-03-26T12:20:43.897+01:00","colA":"1.138","colB":"29.017"},{"dTim":"2011-03-26T12:20:13.350+01:00","colA":"1.139","colB":"28.988"}]}
{"id":"123456A","data":[{"dTim":"2011-03-27T13:18:43.897+02:00","colA":"0.128","colB":"35.017","colC":"0"},{"dTim":"2011-03-27T13:18:13.350+02:00","colA":"0.129","colB":"35.988","colC":"0"}]}
{"id":"123456A","data":[{"dTim":"2011-03-24T12:18:43.897+01:00","colA":"0.138","colB":"39.017","colC":"0"},{"dTim":"2011-03-24T12:18:13.350+01:00","colA":"0.139","colB":"38.988","colC":"0"}]}
这是另一种不依赖于硬编码列名的方法。基本上,我们的想法是分解 data
和 ColNames
列以获得 'melted' DF,然后我们可以将其旋转以获得您想要的形式:
# define function that processes elements of rdd
# underlying the DF to get a melted RDD
def process(row, cols):
"""cols is list of target columns to explode"""
row=row.asDict()
exploded=[[row['id']]+list(elt) for elt in zip(*[row[col] for col in cols])]
return(exploded)
#Now split ColNames:
df=df.withColumn('col_split', f.split('ColNames',","))
# define target cols to explode, each element of each col
# can be of different length
cols=['data', 'col_split']
# apply function and flatmap the results to get melted RDD/DF
df=df.select(['id']+cols).rdd\
.flatMap(lambda row: process(row, cols))\
.toDF(schema=['id', 'value', 'name'])
# Pivot to get the required form
df.groupby('id').pivot('name').agg(f.max('value')).show()
我有将近 50 万 XML 个包含时间序列数据的文件,每个文件大约 2-3MB,每个文件包含大约 10k 行时间序列数据。这个想法是将每个唯一 ID 的 XML 文件转换为 JSON。但是,每个ID的时序数据需要拆分成行大小为10的批次,转换为JSON写入NoSQL数据库。最初,编写的代码是为每个 ID 迭代一个整体数据帧并增加行大小 10,然后将文档写入数据库。
def resample_idx(X,resample_rate):
for idx in range(0,len(X),resample_rate):
yield X.iloc[idx:idx+resample_rate,:]
# Batch Documents
for idx, df_batch in enumerate(resample_idx(df,10))
dict_ = {}
dict_['id'] = soup.find('id').contents[0]
dict_['data'] = [v for k,v in pd.DataFrame.to_dict(df_batch.T).items()]
JSON 文档的示例如下所示:
{'id':123456A,
'data': [{'A': 251.23,
'B': 130.56,
'dtim': Timestamp('2011-03-24 11:18:13.350000')
},
{
'A': 253.23,
'B': 140.56,
'dtim': Timestamp('2011-03-24 11:19:21.310000')
},
.........
]
},
{'id':123593X,
'data': [{'A': 641.13,
'B': 220.51,
'C': 10.45
'dtim': Timestamp('2011-03-26 12:11:13.350000')
},
{
'A': 153.25,
'B': 810.16,
'C': 12.5
'dtim': Timestamp('2011-03-26 12:11:13.310000')
},
.........
]
}
这适用于小样本,但很快意识到这在创建批次时无法扩展。因此,希望在 Spark 中复制它。使用 Spark 的经验有限,但这是我到目前为止所做的尝试:
首先获取所有ID的所有时间序列数据:
df = sqlContext.read.format("com.databricks.spark.xml").options(rowTag='log').load("dbfs:/mnt/timedata/")
XML 架构
|-- _id: string (nullable = true)
|-- collect_list(TimeData): array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- data: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- ColNames: string (nullable = true)
| | |-- Units: string (nullable = true)
SQL 查询获取 Spark DataFrame d = df.select("_id","TimeData.data",'TimeData.ColNames')
当前 Spark 数据帧
+--------------------+--------------------+--------------------+
| id | data| ColNames|
+--------------------+--------------------+--------------------+
|123456A |[2011-03-24 11:18...|dTim,A,B |
|123456A |[2011-03-24 11:19...|dTim,A,B |
|123593X |[2011-03-26 12:11...|dTim,A,B,C |
|123593X |[2011-03-26 12:11...|dTim,A,B,C |
+--------------------+--------------------+--------------------+
预期的 Spark 数据帧
+--------------------+--------------------+----------+----------+
| id | dTime| A| B|
+--------------------+--------------------+----------+----------+
|123456A |2011-03-24 11:18... | 251.23| 130.56|
|123456A |2011-03-24 11:19... | 253.23| 140.56|
+--------------------+--------------------+----------+----------+
+--------------------+--------------------+----------+----------+----------+
| id | dTime| A| B| C|
+--------------------+--------------------+----------+----------+----------+
|123593X |2011-03-26 12:11... | 641.13| 220.51| 10.45|
|123593X |2011-03-26 12:11... | 153.25| 810.16| 12.5|
+--------------------+-------------------+---------- +----------+----------+
我在这里只显示了两个时间戳的数据,但是我怎么能把上面的 DataFrame 变成每第 n 行(每个 id)的批处理 JSON 文件,类似于使用 Pandas如上所示?最初的想法是执行 groupBy 并将 UDF 应用于每个 ID?输出看起来像上面的 JSON 结构。
XML结构:
<log>
<id>"ABC"</id>
<TimeData>
<colNames>dTim,colA,colB,colC,</colNames>
<data>2011-03-24T11:18:13.350Z,0.139,38.988,0,110.307</data>
<data>2011-03-24T11:18:43.897Z,0.138,39.017,0,110.307</data>
</TimeData>
</log>
请注意,每个 ID 没有固定数量的 coNames,可以在 5-30 之间,具体取决于为该 ID 收集的数据源。
好吧,根据信息,这可能是一个解决方案。不幸的是,我的 Python 有点生疏,但这里应该有所有 scala 函数的等价物
// Assume nth is based of dTim ordering
val windowSpec = Window
.partitionBy($"_id")
.orderBy($"dTim".desc)
val nthRow = 2 // define the nthItem to be fetched
df.select(
$"_id",
$"TimeData.data".getItem(0).getItem(0).cast(TimestampType).alias("dTim"),
$"TimeData.data".getItem(0).getItem(1).cast(DoubleType).alias("A"),
$"TimeData.data".getItem(0).getItem(2).cast(DoubleType).alias("B"),
$"TimeData.data".getItem(0).getItem(3).cast(DoubleType).alias("C")
).withColumn("n", row_number().over(windowSpec))
.filter(col("n") === nthRow)
.drop("n")
.show()
会输出类似
的内容+-------+--------------------+------+------+-----+
| _id| dTim| A| B| C|
+-------+--------------------+------+------+-----+
|123456A|2011-03-24 11:18:...|251.23|130.56| null|
|123593X|2011-03-26 12:11:...|641.13|220.51|10.45|
+-------+--------------------+------+------+-----+
如果我知道更多,我会改进答案
更新
我喜欢这个谜题,所以如果我正确理解问题,这可能是一个解决方案:
我创建了 3 xml 个文件,每 2 个数据记录总共有 2 个不同的 ID
val df = spark
.sqlContext
.read
.format("com.databricks.spark.xml")
.option("rowTag", "log")
.load("src/main/resources/xml")
// Could be computationally heavy, maybe cache df first if possible, otherwise run it on a sample, otherwise hardcode possible colums
val colNames = df
.select(explode(split($"TimeData.colNames",",")).as("col"))
.distinct()
.filter($"col" =!= lit("dTim") && $"col" =!= "")
.collect()
.map(_.getString(0))
.toList
.sorted
// or list all possible columns
//val colNames = List("colA", "colB", "colC")
// Based on XML colNames and data are comma seprated strings that have to be split. Could be done using sql split function, but this UDF maps the columns to the correct field
def mapColsToData = udf((cols:String, data:Seq[String]) =>
if(cols == null || data == null) Seq.empty[Map[String, String]]
else {
data.map(str => (cols.split(",") zip str.split(",")).toMap)
}
)
// The result of this action is 1 record for each datapoint for all XML's. Each data record is key->value map of colName->data
val denorm = df.select($"id", explode(mapColsToData($"TimeData.colNames", $"TimeData.data")).as("data"))
denorm.show(false)
输出:
+-------+-------------------------------------------------------------------------------+
|id |data |
+-------+-------------------------------------------------------------------------------+
|123456A|Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0)|
|123593X|Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988) |
|123593X|Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017) |
|123456A|Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0)|
+-------+-------------------------------------------------------------------------------+
// now create column for each map value, based on predef / found columnNames
val columized = denorm.select(
$"id",
$"data.dTim".cast(TimestampType).alias("dTim"),
$"data"
)
columized.show()
输出:
+-------+--------------------+--------------------+
| id| dTim| data|
+-------+--------------------+--------------------+
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
+-------+--------------------+--------------------+
// create window over which to resample
val windowSpec = Window
.partitionBy($"id")
.orderBy($"dTim".desc)
val resampleRate = 2
// add batchId based on resample rate. Group by batch and
val batched = columized
.withColumn("batchId", floor((row_number().over(windowSpec) - lit(1)) / lit(resampleRate)))
.groupBy($"id", $"batchId")
.agg(collect_list($"data").as("data"))
.drop("batchId")
batched.show(false)
输出:
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |data |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123593X|[Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017), Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)] |
|123456A|[Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0), Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)]|
|123456A|[Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0), Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)]|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// Store as 1 huge json file (drop reapatrition if you can handle multiple json, better for master as well)
batched.repartition(1).write.mode(SaveMode.Overwrite).json("/tmp/xml")
输出json:
{"id":"123593X","data":[{"dTim":"2011-03-26T12:20:43.897+01:00","colA":"1.138","colB":"29.017"},{"dTim":"2011-03-26T12:20:13.350+01:00","colA":"1.139","colB":"28.988"}]}
{"id":"123456A","data":[{"dTim":"2011-03-27T13:18:43.897+02:00","colA":"0.128","colB":"35.017","colC":"0"},{"dTim":"2011-03-27T13:18:13.350+02:00","colA":"0.129","colB":"35.988","colC":"0"}]}
{"id":"123456A","data":[{"dTim":"2011-03-24T12:18:43.897+01:00","colA":"0.138","colB":"39.017","colC":"0"},{"dTim":"2011-03-24T12:18:13.350+01:00","colA":"0.139","colB":"38.988","colC":"0"}]}
这是另一种不依赖于硬编码列名的方法。基本上,我们的想法是分解 data
和 ColNames
列以获得 'melted' DF,然后我们可以将其旋转以获得您想要的形式:
# define function that processes elements of rdd
# underlying the DF to get a melted RDD
def process(row, cols):
"""cols is list of target columns to explode"""
row=row.asDict()
exploded=[[row['id']]+list(elt) for elt in zip(*[row[col] for col in cols])]
return(exploded)
#Now split ColNames:
df=df.withColumn('col_split', f.split('ColNames',","))
# define target cols to explode, each element of each col
# can be of different length
cols=['data', 'col_split']
# apply function and flatmap the results to get melted RDD/DF
df=df.select(['id']+cols).rdd\
.flatMap(lambda row: process(row, cols))\
.toDF(schema=['id', 'value', 'name'])
# Pivot to get the required form
df.groupby('id').pivot('name').agg(f.max('value')).show()