字典数组的总和取决于值条件 pyspark(spark 结构化流)
Sum of array of dictionaries depending on value condition pyspark (spark structured streaming)
我有以下架构
tick_by_tick_schema = StructType([
StructField('localSymbol', StringType()),
StructField('time', StringType()),
StructField('open', StringType()),
StructField('previous_price', StringType()),
StructField('tickByTicks', ArrayType(StructType([
StructField('price', StringType()),
StructField('size', StringType()),
StructField('specialConditions', StringType()),
])))
])
我有以下数据框(在 spark 结构化流中):
+-----------+--------------------------------+--------------+----------------------------------------------------+
|localSymbol|time |previous_price|tickByTicks |
+-----------+--------------------------------+--------------+----------------------------------------------------+
|BABA |2021-06-10 19:25:38.154245+00:00|213.76 |[{213.75, 100, }] |
|BABA |2021-06-10 19:25:38.155229+00:00|213.76 |[{213.75, 100, }, {213.78, 100, }, {213.78, 200, }] |
|BABA |2021-06-10 19:25:39.662033+00:00|213.73 |[{213.72, 100, }] |
|BABA |2021-06-10 19:25:39.662655+00:00|213.72 |[{213.72, 100, }, {213.73, 100, }] |
+-----------+--------------------------------+--------------+----------------------------------------------------+
我想根据下一个逻辑创建两列:
Column_low: WHEN tickByTicks.price < previous_price THEN sum(tickByTicks.size)
Column_high: when tickByTicks.price > previous_price THEN sum(tickByTicks.size)
结果将是:
+-----------+--------------------------------+--------------+----------------------------------------------------+----------+-----------+
|localSymbol|time |previous_price|tickByTicks |Column_low|Column_high|
+-----------+--------------------------------+--------------+----------------------------------------------------+----------+-----------+
|BABA |2021-06-10 19:25:38.154245+00:00|213.76 |[{213.75, 100, }] |100 |0 |
|BABA |2021-06-10 19:25:38.155229+00:00|213.76 |[{213.75, 100, }, {213.78, 100, }, {213.78, 200, }] |100 |300 |
|BABA |2021-06-10 19:25:39.662033+00:00|213.73 |[{213.72, 100, }] |100 |0 |
|BABA |2021-06-10 19:25:39.662655+00:00|213.72 |[{213.72, 100, }, {213.73, 100, }] |0 |100 |
+-----------+--------------------------------+--------------+----------------------------------------------------+----------+-----------+
我尝试过类似的操作,但没有达到预期的效果
tick_by_tick_data_processed = kafka_df_structured_with_tick_by_tick_data_values.select(
f.col('localSymbol'),
f.col('time'),
f.col('previous_price'),
f.col('tickByTicks'),
f.expr("aggregate(filter(tickByTicks.size, x -> x > previous_price), 0D, (x, acc) -> acc + x)")
).show(30,False)
我无法测试我的解决方案,但我认为这可能有效:
tick_by_tick_data_processed = kafka_df_structured_with_tick_by_tick_data_values.select(
f.col('localSymbol'),
f.col('time'),
f.col('previous_price'),
f.col('tickByTicks'),
f.expr("aggregate(tickByTicks, 0D, (acc, tick) -> IF(tick.price < previous_price, acc + tick.size, acc))").alias("Column_low"),
f.expr("aggregate(tickByTicks, 0D, (acc, tick) -> IF(tick.price > previous_price, acc + tick.size, acc))").alias("Column_high"))
这可以使用分解和求和函数
from pyspark.sql.window import Window
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import explode
data = [
("BABA", "2021-06-10 19:25:38.154245+00:00" ,"213.76" ,[("213.75", "100")] ),
("BABA", "2021-06-10 19:25:38.155229+00:00" ,"213.76" ,[("213.75", "100"),("213.78", "100"),("213.78", "200")] ),
("BABA", "2021-06-10 19:25:39.662033+00:00" ,"213.73" ,[("213.72", "100")] ),
("BABA", "2021-06-10 19:25:39.662655+00:00" ,"213.72" ,[("213.72", "100"),("213.73", "100")] ),
]
tick_by_tick_schema = StructType([
StructField('localSymbol', StringType()),
StructField('time', StringType()),
StructField('previous_price', StringType()),
StructField('tickByTicks', ArrayType(StructType([
StructField('price', StringType()),
StructField('size', StringType())
])))
])
df = spark.createDataFrame(data=data, schema=tick_by_tick_schema)
df = df.withColumn("idx", monotonically_increasing_id())
df=df.withColumn("col3", explode(df.tickByTicks))
df.createOrReplaceTempView("calc")
spark.sql("select localSymbol,time,previous_price,idx,tickByTicks, sum (case when col3.price < previous_price then col3.size else 0 end) as Column_low ,sum(case when col3.price > previous_price then col3.size else 0 end) as Column_low from calc group by localSymbol,time,previous_price,idx,tickByTicks ").drop("idx").show(truncate=0)
结果
+-----------+--------------------------------+--------------+---------------------------------------------+----------+----------+
|localSymbol|time |previous_price|tickByTicks |Column_low|Column_low|
+-----------+--------------------------------+--------------+---------------------------------------------+----------+----------+
|BABA |2021-06-10 19:25:39.662033+00:00|213.73 |[[213.72, 100]] |100.0 |0.0 |
|BABA |2021-06-10 19:25:38.154245+00:00|213.76 |[[213.75, 100]] |100.0 |0.0 |
|BABA |2021-06-10 19:25:39.662655+00:00|213.72 |[[213.72, 100], [213.73, 100]] |0.0 |100.0 |
|BABA |2021-06-10 19:25:38.155229+00:00|213.76 |[[213.75, 100], [213.78, 100], [213.78, 200]]|100.0 |300.0 |
+-----------+--------------------------------+--------------+---------------------------------------------+----------+----------+
>>>
我有以下架构
tick_by_tick_schema = StructType([
StructField('localSymbol', StringType()),
StructField('time', StringType()),
StructField('open', StringType()),
StructField('previous_price', StringType()),
StructField('tickByTicks', ArrayType(StructType([
StructField('price', StringType()),
StructField('size', StringType()),
StructField('specialConditions', StringType()),
])))
])
我有以下数据框(在 spark 结构化流中):
+-----------+--------------------------------+--------------+----------------------------------------------------+
|localSymbol|time |previous_price|tickByTicks |
+-----------+--------------------------------+--------------+----------------------------------------------------+
|BABA |2021-06-10 19:25:38.154245+00:00|213.76 |[{213.75, 100, }] |
|BABA |2021-06-10 19:25:38.155229+00:00|213.76 |[{213.75, 100, }, {213.78, 100, }, {213.78, 200, }] |
|BABA |2021-06-10 19:25:39.662033+00:00|213.73 |[{213.72, 100, }] |
|BABA |2021-06-10 19:25:39.662655+00:00|213.72 |[{213.72, 100, }, {213.73, 100, }] |
+-----------+--------------------------------+--------------+----------------------------------------------------+
我想根据下一个逻辑创建两列:
Column_low: WHEN tickByTicks.price < previous_price THEN sum(tickByTicks.size)
Column_high: when tickByTicks.price > previous_price THEN sum(tickByTicks.size)
结果将是:
+-----------+--------------------------------+--------------+----------------------------------------------------+----------+-----------+
|localSymbol|time |previous_price|tickByTicks |Column_low|Column_high|
+-----------+--------------------------------+--------------+----------------------------------------------------+----------+-----------+
|BABA |2021-06-10 19:25:38.154245+00:00|213.76 |[{213.75, 100, }] |100 |0 |
|BABA |2021-06-10 19:25:38.155229+00:00|213.76 |[{213.75, 100, }, {213.78, 100, }, {213.78, 200, }] |100 |300 |
|BABA |2021-06-10 19:25:39.662033+00:00|213.73 |[{213.72, 100, }] |100 |0 |
|BABA |2021-06-10 19:25:39.662655+00:00|213.72 |[{213.72, 100, }, {213.73, 100, }] |0 |100 |
+-----------+--------------------------------+--------------+----------------------------------------------------+----------+-----------+
我尝试过类似的操作,但没有达到预期的效果
tick_by_tick_data_processed = kafka_df_structured_with_tick_by_tick_data_values.select(
f.col('localSymbol'),
f.col('time'),
f.col('previous_price'),
f.col('tickByTicks'),
f.expr("aggregate(filter(tickByTicks.size, x -> x > previous_price), 0D, (x, acc) -> acc + x)")
).show(30,False)
我无法测试我的解决方案,但我认为这可能有效:
tick_by_tick_data_processed = kafka_df_structured_with_tick_by_tick_data_values.select(
f.col('localSymbol'),
f.col('time'),
f.col('previous_price'),
f.col('tickByTicks'),
f.expr("aggregate(tickByTicks, 0D, (acc, tick) -> IF(tick.price < previous_price, acc + tick.size, acc))").alias("Column_low"),
f.expr("aggregate(tickByTicks, 0D, (acc, tick) -> IF(tick.price > previous_price, acc + tick.size, acc))").alias("Column_high"))
这可以使用分解和求和函数
from pyspark.sql.window import Window
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import explode
data = [
("BABA", "2021-06-10 19:25:38.154245+00:00" ,"213.76" ,[("213.75", "100")] ),
("BABA", "2021-06-10 19:25:38.155229+00:00" ,"213.76" ,[("213.75", "100"),("213.78", "100"),("213.78", "200")] ),
("BABA", "2021-06-10 19:25:39.662033+00:00" ,"213.73" ,[("213.72", "100")] ),
("BABA", "2021-06-10 19:25:39.662655+00:00" ,"213.72" ,[("213.72", "100"),("213.73", "100")] ),
]
tick_by_tick_schema = StructType([
StructField('localSymbol', StringType()),
StructField('time', StringType()),
StructField('previous_price', StringType()),
StructField('tickByTicks', ArrayType(StructType([
StructField('price', StringType()),
StructField('size', StringType())
])))
])
df = spark.createDataFrame(data=data, schema=tick_by_tick_schema)
df = df.withColumn("idx", monotonically_increasing_id())
df=df.withColumn("col3", explode(df.tickByTicks))
df.createOrReplaceTempView("calc")
spark.sql("select localSymbol,time,previous_price,idx,tickByTicks, sum (case when col3.price < previous_price then col3.size else 0 end) as Column_low ,sum(case when col3.price > previous_price then col3.size else 0 end) as Column_low from calc group by localSymbol,time,previous_price,idx,tickByTicks ").drop("idx").show(truncate=0)
结果
+-----------+--------------------------------+--------------+---------------------------------------------+----------+----------+
|localSymbol|time |previous_price|tickByTicks |Column_low|Column_low|
+-----------+--------------------------------+--------------+---------------------------------------------+----------+----------+
|BABA |2021-06-10 19:25:39.662033+00:00|213.73 |[[213.72, 100]] |100.0 |0.0 |
|BABA |2021-06-10 19:25:38.154245+00:00|213.76 |[[213.75, 100]] |100.0 |0.0 |
|BABA |2021-06-10 19:25:39.662655+00:00|213.72 |[[213.72, 100], [213.73, 100]] |0.0 |100.0 |
|BABA |2021-06-10 19:25:38.155229+00:00|213.76 |[[213.75, 100], [213.78, 100], [213.78, 200]]|100.0 |300.0 |
+-----------+--------------------------------+--------------+---------------------------------------------+----------+----------+
>>>