折叠 spark 数据框中的列值

Collapsing column values in spark dataframes

我有 2 个 DataFrame

case class UserTransactions(id: Long, transactionDate: java.sql.Date, currencyUsed: String, value: Long)

ID, TransactionDate, CurrencyUsed, value
1, 2016-01-05, USD, 100
1, 2016-01-09, GBP, 150 
1, 2016-02-01, USD, 50
1, 2016-02-10, JPN, 10
2, 2016-01-10, EURO, 50
2, 2016-01-10, GBP, 100

case class ReportingTime(userId: Long, reportDate: java.sql.Date)

userId, reportDate
1, 2016-01-05
1, 2016-01-31
1, 2016-02-15
2, 2016-01-10
2, 2016-02-01

现在我想通过 userIdreportDatesum 组合所有以前使用的货币来获得摘要。结果应如下所示:

userId, reportDate, trasactionSummary
1, 2016-01-05, None
1, 2016-01-31, (USD -> 100)(GBP-> 150) // combined above 2 transactions less than 2016-01-31
1, 2016-02-15, (USD -> 150)(GBP-> 150)(JPN->10) // combined transactions less than 2016-02-15
2, 2016-01-10, None
2, 2016-02-01, (EURO-> 50) (GBP-> 100)

要做到这一点,最好的方法是什么?我们有超过 3 亿笔交易,每个用户最多可以有 10,000 笔交易。

下面的代码片段可以满足您的要求。初始加入和聚合是通过 pyspark 的 Dataframe API 完成的。然后通过 RDD api 完成数据分组(使用 reduceByKey)和最终数据集准备,因为它更适合此类操作。

from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from pyspark.sql import functions as F

df1 = spark.createDataFrame([(1,'2016-01-05','USD',100),
(1,'2016-01-09','GBP',150),
(1,'2016-02-01','USD',50),
(1,'2016-02-10','JPN',10),
(2,'2016-01-10','EURO',50),
(2,'2016-01-10','GBP',100)],['id', 'tdate', 'currency', 'value'])

df2 = spark.createDataFrame([(1,'2016-01-05'),
(1,'2016-01-31'),
(1,'2016-02-15'),
(2,'2016-01-10'),
(2,'2016-02-01')],['user_id', 'report_date'])


func =  udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType()) ### function to convert string data type to date data type

df2 = df2.withColumn('tdate', func(df2.report_date))
df1 = df1.withColumn('tdate', func(df1.tdate))
result = df2.join(df1, (df1.id == df2.user_id) & (df1.tdate < df2.report_date), 'left_outer').select('user_id', 'report_date', 'currency', 'value').groupBy('user_id', 'report_date', 'currency').agg(F.sum('value').alias('value'))

data = result.rdd.map(lambda x: (x.user_id,x.report_date,x.currency,x.value)).keyBy(lambda x: (x[0],x[1])).mapValues(lambda x: filter(lambda x: bool(x),[(x[2],x[3]) if x[2] else None])).reduceByKey(lambda x,y: x + y).map(lambda x: (x[0][0],x[0][1], x[1]))

最终生成的结果如下图

>>> spark.createDataFrame([ (x[0],x[1],str(x[2])) for x in data.collect()], ['id', 'date', 'values']).orderBy('id', 'date').show(20, False)
+---+----------+--------------------------------------------+
|id |date      |values                                      |
+---+----------+--------------------------------------------+
|1  |2016-01-05|[]                                          |
|1  |2016-01-31|[(u'USD', 100), (u'GBP', 150)]              |
|1  |2016-02-15|[(u'USD', 150), (u'GBP', 150), (u'JPN', 10)]|
|2  |2016-01-10|[]                                          |
|2  |2016-02-01|[(u'EURO', 50), (u'GBP', 100)]              |
+---+----------+--------------------------------------------+

万一有人需要 Scala

case class Transaction(id: String, date: java.sql.Date, currency:Option[String], value: Option[Long])
case class Report(id:String, date:java.sql.Date)

def toDate(date: String): java.sql.Date = {
  val sf = new SimpleDateFormat("yyyy-MM-dd")
  new java.sql.Date(sf.parse(date).getTime)
}

val allTransactions = Seq(
  Transaction("1", toDate("2016-01-05"),Some("USD"),Some(100L)),
  Transaction("1", toDate("2016-01-09"),Some("GBP"),Some(150L)),
  Transaction("1",toDate("2016-02-01"),Some("USD"),Some(50L)),
  Transaction("1",toDate("2016-02-10"),Some("JPN"),Some(10L)),
  Transaction("2",toDate("2016-01-10"),Some("EURO"),Some(50L)),
  Transaction("2",toDate("2016-01-10"),Some("GBP"),Some(100L))
)
val allReports = Seq(
  Report("1",toDate("2016-01-05")),
  Report("1",toDate("2016-01-31")),
  Report("1",toDate("2016-02-15")),
  Report("2",toDate("2016-01-10")),
  Report("2",toDate("2016-02-01"))
)

val transections:Dataset[Transaction]  = spark.createDataFrame(allTransactions).as[Transaction]
val reports: Dataset[Report] = spark.createDataFrame(allReports).as[Report]

val result = reports.alias("rp").join(transections.alias("tx"), (col("tx.id") === col("rp.id")) && (col("tx.date") < col("rp.date")), "left_outer")
  .select("rp.id", "rp.date", "currency", "value")
  .groupBy("rp.id", "rp.date", "currency").agg(sum("value"))
  .toDF("id", "date", "currency", "value")
  .as[Transaction]

val data = result.rdd.keyBy(x => (x.id , x.date))
  .mapValues(x => if (x.currency.isDefined) collection.Map[String, Long](x.currency.get -> x.value.get) else collection.Map[String, Long]())
  .reduceByKey((x,y) => x ++ y).map(x => (x._1._1, x._1._2, x._2))
  .toDF("id", "date", "map")
  .orderBy("id", "date")

控制台输出

+---+----------+--------------------------------------+
|id |date      |map                                   |
+---+----------+--------------------------------------+
|1  |2016-01-05|Map()                                 |
|1  |2016-01-31|Map(GBP -> 150, USD -> 100)           |
|1  |2016-02-15|Map(USD -> 150, GBP -> 150, JPN -> 10)|
|2  |2016-01-10|Map()                                 |
|2  |2016-02-01|Map(GBP -> 100, EURO -> 50)           |
+---+----------+--------------------------------------+