具有多个列表的数据框在 spark 结构化流中的列中转换
Dataframe with several list convert in column in spark structured streaming
我有以下问题,我在 spark structured streaming 中有一个数据框,其中包含两列和字典列表。我为我拥有的数据结构创建的方案如下:
tick_by_tick_schema = StructType([
StructField('localSymbol', StringType()),
StructField('tickByTicks', ArrayType(StructType([
StructField('price', StringType()),
StructField('size', StringType()),
StructField('specialConditions', StringType()),
]))),
StructField('domBids', ArrayType(StructType([
StructField('price_bid', StringType()),
StructField('size_bid', StringType()),
StructField('marketMaker_bid', StringType()),
]))),
StructField('domAsks', ArrayType(StructType([
StructField('price_ask', StringType()),
StructField('size_ask', StringType()),
StructField('marketMaker_ask', StringType()),
])))
])
我的数据框是这样的:
+-----------+------------------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|localSymbol|tickByTicks |domBids |domAsks |
+-----------+------------------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|BABA |[{213.73, 134, T}]|[{213.51, 1, ARCA}, {213.51, 1, NSDQ}, {213.5, 12, NSDQ}, {213.06, 1, ARCA}, {213.01, 10, DRCTEDGE}]|[{213.75, 45, ARCA}, {213.95, 1, DRCTEDGE}, {214.0, 1, ARCA}, {214.0, 1, NSDQ}, {214.1, 1, NSDQ}]|
+-----------+------------------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
现在我想得到的是这样的:
+-----------+------+---------+---------+
|localSymbol|price |price_bid|price_ask|
+-----------+------+---------+---------+
|BABA |213.73|213.51 |213.75 |
|BABA |213.73|213.51 |213.95 |
|BABA |213.73|213.5 |214.0 |
|BABA |213.73|213.06 |214.0 |
|BABA |213.73|213.01 |214.1 |
+-----------+------+---------+---------+
我试过这个:
df = self.tick_by_tick_data_processed\
.withColumn('price', f.explode(f.col('tickByTicks.price'))) \
.withColumn('price_ask', f.explode(f.col('domAsks.price_ask'))) \
.withColumn('price_bid', f.explode(f.col('domBids.price_bid'))).select('localSymbol','price','price_bid','price_ask')
但不起作用,我不想按时间分组 window,这就是我不想按时间分组的原因
你能帮帮我吗?
谢谢!
我的解决办法是:
df1 = self.tick_by_tick_data_processed\
.select(
f.col('localSymbol').alias('localSymbol_bids'),
f.col('time').cast('string').alias('time_bids'),
f.posexplode('domBids.price_bid').alias('id_bids','price_bids')
)
df2 = self.tick_by_tick_data_processed\
.select(
f.col('localSymbol').alias('localSymbol_asks'),
f.col('time').cast('string').alias('time_asks'),
f.posexplode('domAsks.price_ask').alias('id_asks','price_asks')
)
join_expr = "id_bids=id_asks AND localSymbol_bids=localSymbol_asks AND time_bids=time_asks"
join_type = "inner"
join_df = df1.join(df2, f.expr(join_expr), join_type)
结果是:
+----------------+--------------------------+-------+----------+----------------+--------------------------+-------+----------+
|localSymbol_bids|time_bids |id_bids|price_bids|localSymbol_asks|time_asks |id_asks|price_asks|
+----------------+--------------------------+-------+----------+----------------+--------------------------+-------+----------+
|BABA |2021-06-10 13:23:50.701279|0 |219.35 |BABA |2021-06-10 13:23:50.701279|0 |213.45 |
|BABA |2021-06-10 13:23:50.701279|1 |214.0 |BABA |2021-06-10 13:23:50.701279|1 |213.46 |
|BABA |2021-06-10 13:23:50.701279|4 |213.5 |BABA |2021-06-10 13:23:50.701279|4 |213.5 |
|BABA |2021-06-10 13:23:50.701279|2 |213.6 |BABA |2021-06-10 13:23:50.701279|2 |213.5 |
|BABA |2021-06-10 13:23:50.701279|3 |213.55 |BABA |2021-06-10 13:23:50.701279|3 |213.5 |
+----------------+--------------------------+-------+----------+----------------+--------------------------+-------+----------+
我有以下问题,我在 spark structured streaming 中有一个数据框,其中包含两列和字典列表。我为我拥有的数据结构创建的方案如下:
tick_by_tick_schema = StructType([
StructField('localSymbol', StringType()),
StructField('tickByTicks', ArrayType(StructType([
StructField('price', StringType()),
StructField('size', StringType()),
StructField('specialConditions', StringType()),
]))),
StructField('domBids', ArrayType(StructType([
StructField('price_bid', StringType()),
StructField('size_bid', StringType()),
StructField('marketMaker_bid', StringType()),
]))),
StructField('domAsks', ArrayType(StructType([
StructField('price_ask', StringType()),
StructField('size_ask', StringType()),
StructField('marketMaker_ask', StringType()),
])))
])
我的数据框是这样的:
+-----------+------------------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|localSymbol|tickByTicks |domBids |domAsks |
+-----------+------------------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|BABA |[{213.73, 134, T}]|[{213.51, 1, ARCA}, {213.51, 1, NSDQ}, {213.5, 12, NSDQ}, {213.06, 1, ARCA}, {213.01, 10, DRCTEDGE}]|[{213.75, 45, ARCA}, {213.95, 1, DRCTEDGE}, {214.0, 1, ARCA}, {214.0, 1, NSDQ}, {214.1, 1, NSDQ}]|
+-----------+------------------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
现在我想得到的是这样的:
+-----------+------+---------+---------+
|localSymbol|price |price_bid|price_ask|
+-----------+------+---------+---------+
|BABA |213.73|213.51 |213.75 |
|BABA |213.73|213.51 |213.95 |
|BABA |213.73|213.5 |214.0 |
|BABA |213.73|213.06 |214.0 |
|BABA |213.73|213.01 |214.1 |
+-----------+------+---------+---------+
我试过这个:
df = self.tick_by_tick_data_processed\
.withColumn('price', f.explode(f.col('tickByTicks.price'))) \
.withColumn('price_ask', f.explode(f.col('domAsks.price_ask'))) \
.withColumn('price_bid', f.explode(f.col('domBids.price_bid'))).select('localSymbol','price','price_bid','price_ask')
但不起作用,我不想按时间分组 window,这就是我不想按时间分组的原因
你能帮帮我吗?
谢谢!
我的解决办法是:
df1 = self.tick_by_tick_data_processed\
.select(
f.col('localSymbol').alias('localSymbol_bids'),
f.col('time').cast('string').alias('time_bids'),
f.posexplode('domBids.price_bid').alias('id_bids','price_bids')
)
df2 = self.tick_by_tick_data_processed\
.select(
f.col('localSymbol').alias('localSymbol_asks'),
f.col('time').cast('string').alias('time_asks'),
f.posexplode('domAsks.price_ask').alias('id_asks','price_asks')
)
join_expr = "id_bids=id_asks AND localSymbol_bids=localSymbol_asks AND time_bids=time_asks"
join_type = "inner"
join_df = df1.join(df2, f.expr(join_expr), join_type)
结果是:
+----------------+--------------------------+-------+----------+----------------+--------------------------+-------+----------+
|localSymbol_bids|time_bids |id_bids|price_bids|localSymbol_asks|time_asks |id_asks|price_asks|
+----------------+--------------------------+-------+----------+----------------+--------------------------+-------+----------+
|BABA |2021-06-10 13:23:50.701279|0 |219.35 |BABA |2021-06-10 13:23:50.701279|0 |213.45 |
|BABA |2021-06-10 13:23:50.701279|1 |214.0 |BABA |2021-06-10 13:23:50.701279|1 |213.46 |
|BABA |2021-06-10 13:23:50.701279|4 |213.5 |BABA |2021-06-10 13:23:50.701279|4 |213.5 |
|BABA |2021-06-10 13:23:50.701279|2 |213.6 |BABA |2021-06-10 13:23:50.701279|2 |213.5 |
|BABA |2021-06-10 13:23:50.701279|3 |213.55 |BABA |2021-06-10 13:23:50.701279|3 |213.5 |
+----------------+--------------------------+-------+----------+----------------+--------------------------+-------+----------+