识别在 Spark 中还原的值
Identifying values that revert in Spark
我有一个客户的 Spark DataFrame,如下所示。
#SparkR code
customers <- data.frame(custID = c("001", "001", "001", "002", "002", "002", "002"),
date = c("2017-02-01", "2017-03-01", "2017-04-01", "2017-01-01", "2017-02-01", "2017-03-01", "2017-04-01"),
value = c('new', 'good', 'good', 'new', 'good', 'new', 'bad'))
customers <- createDataFrame(customers)
display(customers)
custID| date | value
--------------------------
001 | 2017-02-01| new
001 | 2017-03-01| good
001 | 2017-04-01| good
002 | 2017-01-01| new
002 | 2017-02-01| good
002 | 2017-03-01| new
002 | 2017-04-01| bad
在 custID
的第一个月观察中,客户获得 value
的 'new'。此后,它们被分类为 'good' 或 'bad'。但是,客户可以在开设第二个帐户的情况下从 'good' 或 'bad' 恢复到 'new'。发生这种情况时,我想用“2”而不是“1”来标记客户,以表明他们开设了第二个帐户,如下所示。我如何在 Spark 中执行此操作? SparkR 或 PySpark 命令都有效。
#What I want to get
custID| date | value | tag
--------------------------------
001 | 2017-02-01| new | 1
001 | 2017-03-01| good | 1
001 | 2017-04-01| good | 1
002 | 2017-01-01| new | 1
002 | 2017-02-01| good | 1
002 | 2017-03-01| new | 2
002 | 2017-04-01| bad | 2
在 pyspark 中:
from pyspark.sql import functions as f
spark = SparkSession.builder.getOrCreate()
# df is equal to your customers dataframe
df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', header=True, sep='|').cache()
df_new = df.filter(df['value'] == 'new').withColumn('tag', f.rank().over(Window.partitionBy('custID').orderBy('date')))
df = df_new.union(df.filter(df['value'] != 'new').withColumn('tag', f.lit(None)))
df = df.withColumn('tag', f.collect_list('tag').over(Window.partitionBy('custID').orderBy('date'))) \
.withColumn('tag', f.UserDefinedFunction(lambda x: x.pop(), IntegerType())('tag'))
df.show()
并输出:
+------+----------+-----+---+
|custID| date|value|tag|
+------+----------+-----+---+
| 001|2017-02-01| new| 1|
| 001|2017-03-01| good| 1|
| 001|2017-04-01| good| 1|
| 002|2017-01-01| new| 1|
| 002|2017-02-01| good| 1|
| 002|2017-03-01| new| 2|
| 002|2017-04-01| bad| 2|
+------+----------+-----+---+
顺便说一句,pandas可以轻松做到。
这可以使用以下代码完成:
过滤掉所有符合"new"
的记录
df_new<-sql("select * from df where value="new")
createOrReplaceTempView(df_new,"df_new")
df_new<-sql("select *,row_number() over(partiting by custID order by date)
tag from df_new")
createOrReplaceTempView(df_new,"df_new")
df<-sql("select custID,date,value,min(tag) as tag from
(select t1.*,t2.tag from df t1 left outer join df_new t2 on
t1.custID=t2.custID and t1.date>=t2.date) group by 1,2,3")
我有一个客户的 Spark DataFrame,如下所示。
#SparkR code
customers <- data.frame(custID = c("001", "001", "001", "002", "002", "002", "002"),
date = c("2017-02-01", "2017-03-01", "2017-04-01", "2017-01-01", "2017-02-01", "2017-03-01", "2017-04-01"),
value = c('new', 'good', 'good', 'new', 'good', 'new', 'bad'))
customers <- createDataFrame(customers)
display(customers)
custID| date | value
--------------------------
001 | 2017-02-01| new
001 | 2017-03-01| good
001 | 2017-04-01| good
002 | 2017-01-01| new
002 | 2017-02-01| good
002 | 2017-03-01| new
002 | 2017-04-01| bad
在 custID
的第一个月观察中,客户获得 value
的 'new'。此后,它们被分类为 'good' 或 'bad'。但是,客户可以在开设第二个帐户的情况下从 'good' 或 'bad' 恢复到 'new'。发生这种情况时,我想用“2”而不是“1”来标记客户,以表明他们开设了第二个帐户,如下所示。我如何在 Spark 中执行此操作? SparkR 或 PySpark 命令都有效。
#What I want to get
custID| date | value | tag
--------------------------------
001 | 2017-02-01| new | 1
001 | 2017-03-01| good | 1
001 | 2017-04-01| good | 1
002 | 2017-01-01| new | 1
002 | 2017-02-01| good | 1
002 | 2017-03-01| new | 2
002 | 2017-04-01| bad | 2
在 pyspark 中:
from pyspark.sql import functions as f
spark = SparkSession.builder.getOrCreate()
# df is equal to your customers dataframe
df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', header=True, sep='|').cache()
df_new = df.filter(df['value'] == 'new').withColumn('tag', f.rank().over(Window.partitionBy('custID').orderBy('date')))
df = df_new.union(df.filter(df['value'] != 'new').withColumn('tag', f.lit(None)))
df = df.withColumn('tag', f.collect_list('tag').over(Window.partitionBy('custID').orderBy('date'))) \
.withColumn('tag', f.UserDefinedFunction(lambda x: x.pop(), IntegerType())('tag'))
df.show()
并输出:
+------+----------+-----+---+
|custID| date|value|tag|
+------+----------+-----+---+
| 001|2017-02-01| new| 1|
| 001|2017-03-01| good| 1|
| 001|2017-04-01| good| 1|
| 002|2017-01-01| new| 1|
| 002|2017-02-01| good| 1|
| 002|2017-03-01| new| 2|
| 002|2017-04-01| bad| 2|
+------+----------+-----+---+
顺便说一句,pandas可以轻松做到。
这可以使用以下代码完成:
过滤掉所有符合"new"
的记录df_new<-sql("select * from df where value="new")
createOrReplaceTempView(df_new,"df_new")
df_new<-sql("select *,row_number() over(partiting by custID order by date)
tag from df_new")
createOrReplaceTempView(df_new,"df_new")
df<-sql("select custID,date,value,min(tag) as tag from
(select t1.*,t2.tag from df t1 left outer join df_new t2 on
t1.custID=t2.custID and t1.date>=t2.date) group by 1,2,3")