使用 Spark Scala 提取每月数据
Pull monthly data using Spark Scala
我正在尝试从文件中提取一个月的数据,然后处理 it.Basically 我需要为每个月提取数据并进行一些转换。由于我的工作每天运行,我想利用它并填充该月的数据,直到 run_date.
我有两种方法:
方法一:
仅填充上个月的数据。例如,如果我的 current_date 或 run_date 在 May
月份,我将填充 April
月份的数据。
这可以通过从 current_date()
中提取月份并从中减去 1
来实现。
类似于以下内容:
df.filter(month(to_date(col("startDate")))===month(to_date(current_date())-1))
这只是一个想法。这段代码不会实现我想要做的事情,因为我单独减去月份部分而不考虑 Year
部分。
但在这种情况下,我的工作是每天 运行 填充整个月的相同数据。
这样做没有意义。
方法二:
如果我的 current_date 是 2020-05-27
,我想从 2020-05-01 to 2020-05-26
中提取数据。
如果我当前的日期是 2020-06-01
,它应该填充从 2020-05-01 to 2020-05-31
.
开始的 5 月份的数据
我想实施方法 2。我能想到的唯一想法是写几个 Case
语句来检查日期并相应地填充它。
有人可以分享一些想法吗?有没有稍微直截了当的方法呢
我正在使用Spark 1.5
检查这是否有帮助-
1。加载测试数据
val data =
"""
|2018-04-07 07:07:17
|2018-04-07 07:32:27
|2018-04-07 08:36:44
|2018-04-07 08:38:00
|2018-04-07 08:39:29
|2018-04-08 01:43:08
|2018-04-08 01:43:55
|2018-04-09 07:52:31
|2018-04-09 07:52:42
|2019-01-24 11:52:31
|2019-01-24 12:52:42
|2019-01-25 12:52:42
""".stripMargin
val df = spark.read
.schema(StructType(Array(StructField("startDate", DataTypes.TimestampType))))
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
输出-
+-------------------+
|startDate |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+
root
|-- startDate: timestamp (nullable = true)
2。根据 current date
创建过滤列
val filterCOl = (currentDate: String) => when(datediff(date_format(lit(currentDate), "yyyy-MM-dd")
,date_format(lit(currentDate), "yyyy-MM-01"))===lit(0),
date_format(col("startDate"), "yyyy-MM") ===
date_format(concat_ws("-",year(lit(currentDate)), month(lit(currentDate)) -1), "yyyy-MM")
).otherwise(to_date(col("startDate"))
.between(date_format(lit(currentDate), "yyyy-MM-01"), lit(currentDate)))
3。检查当前数据何时在月
之间
var currentDateStr = "2018-04-08"
df.filter(filterCOl(currentDateStr)).show(false)
输出-
+-------------------+
|startDate |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
+-------------------+
4。检查当前数据是当月的第一天
currentDateStr = "2018-05-01"
df.filter(filterCOl(currentDateStr)).show(false)
输出-
+-------------------+
|startDate |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
+-------------------+
我正在尝试从文件中提取一个月的数据,然后处理 it.Basically 我需要为每个月提取数据并进行一些转换。由于我的工作每天运行,我想利用它并填充该月的数据,直到 run_date.
我有两种方法:
方法一:
仅填充上个月的数据。例如,如果我的 current_date 或 run_date 在 May
月份,我将填充 April
月份的数据。
这可以通过从 current_date()
中提取月份并从中减去 1
来实现。
类似于以下内容:
df.filter(month(to_date(col("startDate")))===month(to_date(current_date())-1))
这只是一个想法。这段代码不会实现我想要做的事情,因为我单独减去月份部分而不考虑 Year
部分。
但在这种情况下,我的工作是每天 运行 填充整个月的相同数据。 这样做没有意义。
方法二:
如果我的 current_date 是 2020-05-27
,我想从 2020-05-01 to 2020-05-26
中提取数据。
如果我当前的日期是 2020-06-01
,它应该填充从 2020-05-01 to 2020-05-31
.
我想实施方法 2。我能想到的唯一想法是写几个 Case
语句来检查日期并相应地填充它。
有人可以分享一些想法吗?有没有稍微直截了当的方法呢
我正在使用Spark 1.5
检查这是否有帮助-
1。加载测试数据
val data =
"""
|2018-04-07 07:07:17
|2018-04-07 07:32:27
|2018-04-07 08:36:44
|2018-04-07 08:38:00
|2018-04-07 08:39:29
|2018-04-08 01:43:08
|2018-04-08 01:43:55
|2018-04-09 07:52:31
|2018-04-09 07:52:42
|2019-01-24 11:52:31
|2019-01-24 12:52:42
|2019-01-25 12:52:42
""".stripMargin
val df = spark.read
.schema(StructType(Array(StructField("startDate", DataTypes.TimestampType))))
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
输出-
+-------------------+
|startDate |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+
root
|-- startDate: timestamp (nullable = true)
2。根据 current date
创建过滤列
val filterCOl = (currentDate: String) => when(datediff(date_format(lit(currentDate), "yyyy-MM-dd")
,date_format(lit(currentDate), "yyyy-MM-01"))===lit(0),
date_format(col("startDate"), "yyyy-MM") ===
date_format(concat_ws("-",year(lit(currentDate)), month(lit(currentDate)) -1), "yyyy-MM")
).otherwise(to_date(col("startDate"))
.between(date_format(lit(currentDate), "yyyy-MM-01"), lit(currentDate)))
3。检查当前数据何时在月
之间 var currentDateStr = "2018-04-08"
df.filter(filterCOl(currentDateStr)).show(false)
输出-
+-------------------+
|startDate |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
+-------------------+
4。检查当前数据是当月的第一天
currentDateStr = "2018-05-01"
df.filter(filterCOl(currentDateStr)).show(false)
输出-
+-------------------+
|startDate |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
+-------------------+