按每天 pyspark 分组
group by per day pyspark
我有一个 PySpark DataFrame :
从 id 到 id 价格日期
a b 20 30/05/2019
b c 5 30/05/2019
c a 20 30/05/2019
a d 10 02/06/2019
d c 5 02/06/2019
id Name
a Claudia
b Manuella
c remy
d Paul
我想要的输出是:
Date Name current balance
30/05/2019 Claudia 0
30/05/2019 Manuella 15
30/05/2019 Remy -15
30/05/2019 Paul 0
02/06/2019 Claudia -10
02/06/2019 Manuella 15
02/06/2019 Remy -10
02/06/2019 Paul 5
我想获取所有用户每天的当前余额。
我的想法是为每个用户创建一个 groupby 并计算 TO 列减去 From 列的总和。但是每天怎么做呢?特别是它是累积的而不是每天?
谢谢
为了满足要求,我付出了一些努力。这是我的解决方案版本。
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
from pyspark.sql import Window
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
("a","b",20,"30/05/2019"),
("b","c",5 ,"30/05/2019"),
("c","a",20,"30/05/2019"),
("a","d",10,"02/06/2019"),
("d","c",5 ,"02/06/2019"),
]
df1Columns = ["From_Id", "To_Id", "Price", "Date"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.withColumn("Date",F.to_date(F.to_timestamp("Date", 'dd/MM/yyyy')).alias('Date'))
print("Actual initial data")
df1.show(truncate=False)
data2 = [
("a","Claudia"),
("b","Manuella"),
("c","Remy"),
("d","Paul"),
]
df2Columns = ["id","Name"]
df2 = sqlContext.createDataFrame(data=data2, schema = df2Columns)
print("Actual initial data")
df2.show(truncate=False)
alldays_df = df1.select("Date").distinct().repartition(20)
allusers_df = df2.select("id").distinct().repartition(10)
crossjoin_df = alldays_df.crossJoin(allusers_df)
crossjoin_df = crossjoin_df.withColumn("initial", F.lit(0))
crossjoin_df = crossjoin_df.withColumnRenamed("id", "common_id").cache()
crossjoin_df.show(n=40, truncate=False)
from_sum_df = df1.groupby("Date", "From_Id").agg(F.sum("Price").alias("from_sum"))
from_sum_df = from_sum_df.withColumnRenamed("From_Id", "common_id")
from_sum_df.show(truncate=False)
from_sum_df = crossjoin_df.alias('cross').join(
from_sum_df.alias('from'), ['Date', 'common_id'], how='outer'
).select('Date', 'common_id',
F.coalesce('from.from_sum', 'cross.initial').alias('from_amount') ).cache()
from_sum_df.show(truncate=False)
to_sum_df = df1.groupby("Date", "To_Id").agg(F.sum("Price").alias("to_sum"))
to_sum_df = to_sum_df.withColumnRenamed("To_Id", "common_id")
to_sum_df.show(truncate=False)
to_sum_df = crossjoin_df.alias('cross').join(
to_sum_df.alias('to'), ['Date', 'common_id'], how='outer'
).select('Date', 'common_id',
F.coalesce('to.to_sum', 'cross.initial').alias('to_amount') ).cache()
to_sum_df.show(truncate=False)
joined_df = to_sum_df.join(from_sum_df, ["Date", "common_id"], how='inner')
joined_df.show(truncate=False)
balance_df = joined_df.withColumn("balance", F.col("to_amount") - F.col("from_amount"))
balance_df.show(truncate=False)
final_df = balance_df.join(df2, F.col("id") == F.col("common_id"))
final_df.show(truncate=False)
final_cum_sum = final_df.withColumn('cumsum_balance', F.sum('balance').over(Window.partitionBy('common_id').orderBy('Date').rowsBetween(-sys.maxsize, 0)))
final_cum_sum.show()
以下是您逐步理解的所有输出。我不解释这些步骤。你可以算出来。
Actual initial data
+-------+-----+-----+----------+
|From_Id|To_Id|Price|Date |
+-------+-----+-----+----------+
|a |b |20 |2019-05-30|
|b |c |5 |2019-05-30|
|c |a |20 |2019-05-30|
|a |d |10 |2019-06-02|
|d |c |5 |2019-06-02|
+-------+-----+-----+----------+
Actual initial data
+---+--------+
|id |Name |
+---+--------+
|a |Claudia |
|b |Manuella|
|c |Remy |
|d |Paul |
+---+--------+
+----------+---------+-------+
|Date |common_id|initial|
+----------+---------+-------+
|2019-05-30|a |0 |
|2019-05-30|d |0 |
|2019-05-30|b |0 |
|2019-05-30|c |0 |
|2019-06-02|a |0 |
|2019-06-02|d |0 |
|2019-06-02|b |0 |
|2019-06-02|c |0 |
+----------+---------+-------+
+----------+---------+--------+
|Date |common_id|from_sum|
+----------+---------+--------+
|2019-06-02|a |10 |
|2019-05-30|a |20 |
|2019-06-02|d |5 |
|2019-05-30|c |20 |
|2019-05-30|b |5 |
+----------+---------+--------+
+----------+---------+-----------+
|Date |common_id|from_amount|
+----------+---------+-----------+
|2019-06-02|a |10 |
|2019-06-02|c |0 |
|2019-05-30|a |20 |
|2019-05-30|d |0 |
|2019-06-02|b |0 |
|2019-06-02|d |5 |
|2019-05-30|c |20 |
|2019-05-30|b |5 |
+----------+---------+-----------+
+----------+---------+------+
|Date |common_id|to_sum|
+----------+---------+------+
|2019-06-02|c |5 |
|2019-05-30|a |20 |
|2019-06-02|d |10 |
|2019-05-30|c |5 |
|2019-05-30|b |20 |
+----------+---------+------+
+----------+---------+---------+
|Date |common_id|to_amount|
+----------+---------+---------+
|2019-06-02|a |0 |
|2019-06-02|c |5 |
|2019-05-30|a |20 |
|2019-05-30|d |0 |
|2019-06-02|b |0 |
|2019-06-02|d |10 |
|2019-05-30|c |5 |
|2019-05-30|b |20 |
+----------+---------+---------+
+----------+---------+---------+-----------+
|Date |common_id|to_amount|from_amount|
+----------+---------+---------+-----------+
|2019-06-02|a |0 |10 |
|2019-06-02|c |5 |0 |
|2019-05-30|a |20 |20 |
|2019-05-30|d |0 |0 |
|2019-06-02|b |0 |0 |
|2019-06-02|d |10 |5 |
|2019-05-30|c |5 |20 |
|2019-05-30|b |20 |5 |
+----------+---------+---------+-----------+
+----------+---------+---------+-----------+-------+
|Date |common_id|to_amount|from_amount|balance|
+----------+---------+---------+-----------+-------+
|2019-06-02|a |0 |10 |-10 |
|2019-06-02|c |5 |0 |5 |
|2019-05-30|a |20 |20 |0 |
|2019-05-30|d |0 |0 |0 |
|2019-06-02|b |0 |0 |0 |
|2019-06-02|d |10 |5 |5 |
|2019-05-30|c |5 |20 |-15 |
|2019-05-30|b |20 |5 |15 |
+----------+---------+---------+-----------+-------+
+----------+---------+---------+-----------+-------+---+--------+
|Date |common_id|to_amount|from_amount|balance|id |Name |
+----------+---------+---------+-----------+-------+---+--------+
|2019-05-30|a |20 |20 |0 |a |Claudia |
|2019-06-02|a |0 |10 |-10 |a |Claudia |
|2019-05-30|b |20 |5 |15 |b |Manuella|
|2019-06-02|b |0 |0 |0 |b |Manuella|
|2019-05-30|c |5 |20 |-15 |c |Remy |
|2019-06-02|c |5 |0 |5 |c |Remy |
|2019-06-02|d |10 |5 |5 |d |Paul |
|2019-05-30|d |0 |0 |0 |d |Paul |
+----------+---------+---------+-----------+-------+---+--------+
+----------+---------+---------+-----------+-------+---+--------+--------------+
| Date|common_id|to_amount|from_amount|balance| id| Name|cumsum_balance|
+----------+---------+---------+-----------+-------+---+--------+--------------+
|2019-05-30| d| 0| 0| 0| d| Paul| 0|
|2019-06-02| d| 10| 5| 5| d| Paul| 5|
|2019-05-30| c| 5| 20| -15| c| Remy| -15|
|2019-06-02| c| 5| 0| 5| c| Remy| -10|
|2019-05-30| b| 20| 5| 15| b|Manuella| 15|
|2019-06-02| b| 0| 0| 0| b|Manuella| 15|
|2019-05-30| a| 20| 20| 0| a| Claudia| 0|
|2019-06-02| a| 0| 10| -10| a| Claudia| -10|
+----------+---------+---------+-----------+-------+---+--------+--------------+
我有一个 PySpark DataFrame :
从 id 到 id 价格日期
a b 20 30/05/2019
b c 5 30/05/2019
c a 20 30/05/2019
a d 10 02/06/2019
d c 5 02/06/2019
id Name
a Claudia
b Manuella
c remy
d Paul
我想要的输出是:
Date Name current balance
30/05/2019 Claudia 0
30/05/2019 Manuella 15
30/05/2019 Remy -15
30/05/2019 Paul 0
02/06/2019 Claudia -10
02/06/2019 Manuella 15
02/06/2019 Remy -10
02/06/2019 Paul 5
我想获取所有用户每天的当前余额。
我的想法是为每个用户创建一个 groupby 并计算 TO 列减去 From 列的总和。但是每天怎么做呢?特别是它是累积的而不是每天?
谢谢
为了满足要求,我付出了一些努力。这是我的解决方案版本。
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
from pyspark.sql import Window
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
("a","b",20,"30/05/2019"),
("b","c",5 ,"30/05/2019"),
("c","a",20,"30/05/2019"),
("a","d",10,"02/06/2019"),
("d","c",5 ,"02/06/2019"),
]
df1Columns = ["From_Id", "To_Id", "Price", "Date"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.withColumn("Date",F.to_date(F.to_timestamp("Date", 'dd/MM/yyyy')).alias('Date'))
print("Actual initial data")
df1.show(truncate=False)
data2 = [
("a","Claudia"),
("b","Manuella"),
("c","Remy"),
("d","Paul"),
]
df2Columns = ["id","Name"]
df2 = sqlContext.createDataFrame(data=data2, schema = df2Columns)
print("Actual initial data")
df2.show(truncate=False)
alldays_df = df1.select("Date").distinct().repartition(20)
allusers_df = df2.select("id").distinct().repartition(10)
crossjoin_df = alldays_df.crossJoin(allusers_df)
crossjoin_df = crossjoin_df.withColumn("initial", F.lit(0))
crossjoin_df = crossjoin_df.withColumnRenamed("id", "common_id").cache()
crossjoin_df.show(n=40, truncate=False)
from_sum_df = df1.groupby("Date", "From_Id").agg(F.sum("Price").alias("from_sum"))
from_sum_df = from_sum_df.withColumnRenamed("From_Id", "common_id")
from_sum_df.show(truncate=False)
from_sum_df = crossjoin_df.alias('cross').join(
from_sum_df.alias('from'), ['Date', 'common_id'], how='outer'
).select('Date', 'common_id',
F.coalesce('from.from_sum', 'cross.initial').alias('from_amount') ).cache()
from_sum_df.show(truncate=False)
to_sum_df = df1.groupby("Date", "To_Id").agg(F.sum("Price").alias("to_sum"))
to_sum_df = to_sum_df.withColumnRenamed("To_Id", "common_id")
to_sum_df.show(truncate=False)
to_sum_df = crossjoin_df.alias('cross').join(
to_sum_df.alias('to'), ['Date', 'common_id'], how='outer'
).select('Date', 'common_id',
F.coalesce('to.to_sum', 'cross.initial').alias('to_amount') ).cache()
to_sum_df.show(truncate=False)
joined_df = to_sum_df.join(from_sum_df, ["Date", "common_id"], how='inner')
joined_df.show(truncate=False)
balance_df = joined_df.withColumn("balance", F.col("to_amount") - F.col("from_amount"))
balance_df.show(truncate=False)
final_df = balance_df.join(df2, F.col("id") == F.col("common_id"))
final_df.show(truncate=False)
final_cum_sum = final_df.withColumn('cumsum_balance', F.sum('balance').over(Window.partitionBy('common_id').orderBy('Date').rowsBetween(-sys.maxsize, 0)))
final_cum_sum.show()
以下是您逐步理解的所有输出。我不解释这些步骤。你可以算出来。
Actual initial data
+-------+-----+-----+----------+
|From_Id|To_Id|Price|Date |
+-------+-----+-----+----------+
|a |b |20 |2019-05-30|
|b |c |5 |2019-05-30|
|c |a |20 |2019-05-30|
|a |d |10 |2019-06-02|
|d |c |5 |2019-06-02|
+-------+-----+-----+----------+
Actual initial data
+---+--------+
|id |Name |
+---+--------+
|a |Claudia |
|b |Manuella|
|c |Remy |
|d |Paul |
+---+--------+
+----------+---------+-------+
|Date |common_id|initial|
+----------+---------+-------+
|2019-05-30|a |0 |
|2019-05-30|d |0 |
|2019-05-30|b |0 |
|2019-05-30|c |0 |
|2019-06-02|a |0 |
|2019-06-02|d |0 |
|2019-06-02|b |0 |
|2019-06-02|c |0 |
+----------+---------+-------+
+----------+---------+--------+
|Date |common_id|from_sum|
+----------+---------+--------+
|2019-06-02|a |10 |
|2019-05-30|a |20 |
|2019-06-02|d |5 |
|2019-05-30|c |20 |
|2019-05-30|b |5 |
+----------+---------+--------+
+----------+---------+-----------+
|Date |common_id|from_amount|
+----------+---------+-----------+
|2019-06-02|a |10 |
|2019-06-02|c |0 |
|2019-05-30|a |20 |
|2019-05-30|d |0 |
|2019-06-02|b |0 |
|2019-06-02|d |5 |
|2019-05-30|c |20 |
|2019-05-30|b |5 |
+----------+---------+-----------+
+----------+---------+------+
|Date |common_id|to_sum|
+----------+---------+------+
|2019-06-02|c |5 |
|2019-05-30|a |20 |
|2019-06-02|d |10 |
|2019-05-30|c |5 |
|2019-05-30|b |20 |
+----------+---------+------+
+----------+---------+---------+
|Date |common_id|to_amount|
+----------+---------+---------+
|2019-06-02|a |0 |
|2019-06-02|c |5 |
|2019-05-30|a |20 |
|2019-05-30|d |0 |
|2019-06-02|b |0 |
|2019-06-02|d |10 |
|2019-05-30|c |5 |
|2019-05-30|b |20 |
+----------+---------+---------+
+----------+---------+---------+-----------+
|Date |common_id|to_amount|from_amount|
+----------+---------+---------+-----------+
|2019-06-02|a |0 |10 |
|2019-06-02|c |5 |0 |
|2019-05-30|a |20 |20 |
|2019-05-30|d |0 |0 |
|2019-06-02|b |0 |0 |
|2019-06-02|d |10 |5 |
|2019-05-30|c |5 |20 |
|2019-05-30|b |20 |5 |
+----------+---------+---------+-----------+
+----------+---------+---------+-----------+-------+
|Date |common_id|to_amount|from_amount|balance|
+----------+---------+---------+-----------+-------+
|2019-06-02|a |0 |10 |-10 |
|2019-06-02|c |5 |0 |5 |
|2019-05-30|a |20 |20 |0 |
|2019-05-30|d |0 |0 |0 |
|2019-06-02|b |0 |0 |0 |
|2019-06-02|d |10 |5 |5 |
|2019-05-30|c |5 |20 |-15 |
|2019-05-30|b |20 |5 |15 |
+----------+---------+---------+-----------+-------+
+----------+---------+---------+-----------+-------+---+--------+
|Date |common_id|to_amount|from_amount|balance|id |Name |
+----------+---------+---------+-----------+-------+---+--------+
|2019-05-30|a |20 |20 |0 |a |Claudia |
|2019-06-02|a |0 |10 |-10 |a |Claudia |
|2019-05-30|b |20 |5 |15 |b |Manuella|
|2019-06-02|b |0 |0 |0 |b |Manuella|
|2019-05-30|c |5 |20 |-15 |c |Remy |
|2019-06-02|c |5 |0 |5 |c |Remy |
|2019-06-02|d |10 |5 |5 |d |Paul |
|2019-05-30|d |0 |0 |0 |d |Paul |
+----------+---------+---------+-----------+-------+---+--------+
+----------+---------+---------+-----------+-------+---+--------+--------------+
| Date|common_id|to_amount|from_amount|balance| id| Name|cumsum_balance|
+----------+---------+---------+-----------+-------+---+--------+--------------+
|2019-05-30| d| 0| 0| 0| d| Paul| 0|
|2019-06-02| d| 10| 5| 5| d| Paul| 5|
|2019-05-30| c| 5| 20| -15| c| Remy| -15|
|2019-06-02| c| 5| 0| 5| c| Remy| -10|
|2019-05-30| b| 20| 5| 15| b|Manuella| 15|
|2019-06-02| b| 0| 0| 0| b|Manuella| 15|
|2019-05-30| a| 20| 20| 0| a| Claudia| 0|
|2019-06-02| a| 0| 10| -10| a| Claudia| -10|
+----------+---------+---------+-----------+-------+---+--------+--------------+