如何在火花中处理这个
how to handle this in spark
我正在使用 spark-sql 2.4.x 版本,datastax-spark-cassandra-connector for Cassandra-3.x 版本。与卡夫卡一起。
我有一些来自 kafka 主题的财务数据的场景。数据(基础数据集)包含 companyId, year , prev_year 字段信息。
如果列年 === prev_year 那么我需要加入不同的 table 即 exchange_rates.
如果列年份 =!= prev_year 那么我需要 return 基础数据集本身
如何在 spark-sql 中执行此操作?
您的情况可以参考以下方法。
scala> Input_df.show
+---------+----+---------+----+
|companyId|year|prev_year|rate|
+---------+----+---------+----+
| 1|2016| 2017| 12|
| 1|2017| 2017|21.4|
| 2|2018| 2017|11.7|
| 2|2018| 2018|44.6|
| 3|2016| 2017|34.5|
| 4|2017| 2017| 56|
+---------+----+---------+----+
scala> exch_rates.show
+---------+----+
|companyId|rate|
+---------+----+
| 1|12.3|
| 2|12.5|
| 3|22.3|
| 4|34.6|
| 5|45.2|
+---------+----+
scala> val equaldf = Input_df.filter(col("year") === col("prev_year"))
scala> val notequaldf = Input_df.filter(col("year") =!= col("prev_year"))
scala> val joindf = notequaldf.alias("n").drop("rate").join(exch_rates.alias("e"), List("companyId"), "left")
scala> val finalDF = equaldf.union(joindf)
scala> finalDF.show()
+---------+----+---------+----+
|companyId|year|prev_year|rate|
+---------+----+---------+----+
| 1|2017| 2017|21.4|
| 2|2018| 2018|44.6|
| 4|2017| 2017| 56|
| 1|2016| 2017|12.3|
| 2|2018| 2017|12.5|
| 3|2016| 2017|22.3|
+---------+----+---------+----+
我正在使用 spark-sql 2.4.x 版本,datastax-spark-cassandra-connector for Cassandra-3.x 版本。与卡夫卡一起。
我有一些来自 kafka 主题的财务数据的场景。数据(基础数据集)包含 companyId, year , prev_year 字段信息。
如果列年 === prev_year 那么我需要加入不同的 table 即 exchange_rates.
如果列年份 =!= prev_year 那么我需要 return 基础数据集本身
如何在 spark-sql 中执行此操作?
您的情况可以参考以下方法。
scala> Input_df.show
+---------+----+---------+----+
|companyId|year|prev_year|rate|
+---------+----+---------+----+
| 1|2016| 2017| 12|
| 1|2017| 2017|21.4|
| 2|2018| 2017|11.7|
| 2|2018| 2018|44.6|
| 3|2016| 2017|34.5|
| 4|2017| 2017| 56|
+---------+----+---------+----+
scala> exch_rates.show
+---------+----+
|companyId|rate|
+---------+----+
| 1|12.3|
| 2|12.5|
| 3|22.3|
| 4|34.6|
| 5|45.2|
+---------+----+
scala> val equaldf = Input_df.filter(col("year") === col("prev_year"))
scala> val notequaldf = Input_df.filter(col("year") =!= col("prev_year"))
scala> val joindf = notequaldf.alias("n").drop("rate").join(exch_rates.alias("e"), List("companyId"), "left")
scala> val finalDF = equaldf.union(joindf)
scala> finalDF.show()
+---------+----+---------+----+
|companyId|year|prev_year|rate|
+---------+----+---------+----+
| 1|2017| 2017|21.4|
| 2|2018| 2018|44.6|
| 4|2017| 2017| 56|
| 1|2016| 2017|12.3|
| 2|2018| 2017|12.5|
| 3|2016| 2017|22.3|
+---------+----+---------+----+