PySpark:如何添加数据来自查询的列(类似于每行的子查询)
PySpark: How to add columns whose data come from a query (similar to subquery for each row)
我有假期table
- 开始:日期
- 结束:日期
- 类型:枚举(假期|LONG_WEEKENDS)
一些示例数据:
"start","end","type"
"2019-01-01","2019-01-01","HOLIDAY"
"2019-02-05","2019-02-06","HOLIDAY"
"2019-03-16","2019-03-24","HOLIDAY"
"2019-04-19","2019-04-19","HOLIDAY"
"2019-10-04","2019-10-04","HOLIDAY"
"2019-08-08","2019-08-13","LONG_WEEKENDS"
"2019-10-25","2019-10-29","LONG_WEEKENDS"
"2019-12-20","2020-01-02","LONG_WEEKENDS"
还有一个航班table,为简单起见,它有
- id: varchar
- out_date: 日期
- in_date: 日期
一些示例数据:
"id","out_date","in_date"
"25997661","2019-02-08","2019-02-12"
"25997658","2019-02-08","2019-02-12"
"25997659","2019-02-08","2019-02-12"
"25997662","2019-02-08","2019-02-12"
"25997663","2019-02-08","2019-02-12"
"25997657","2019-02-08","2019-02-12"
"25997660","2019-02-08","2019-02-12"
"25997397","2019-02-08","2019-02-12"
我想在航班 table 中添加 4 列,例如:
- out_date_is_holiday: 布尔值
- out_date_is_longweekends:布尔值
- in_date_is_holiday:布尔值
- in_date_is_longweekends:布尔值
所以 "stupid" 方式是我下载假期 table。然后对于每次飞行,我会做(在 pyspark 中):
- 添加列
out_date_is_holiday if out_date is between holidays.start and holidays.end AND holidays.type = 'HOLIDAYS' then true else false
其他 3 列也是如此。我怎样才能有效地做到这一点?如果重要的话,我正在 AWS Glue 上执行此操作
更新
根据@stack0114106 的建议,我尝试了:
WITH t (
SELECT
f.outboundlegid,
f.inboundlegid,
f.agent,
f.querydatetime,
CASE WHEN type = 'HOLIDAY' AND (out_date BETWEEN start AND end)
THEN true
ELSE false
END out_is_holiday,
CASE WHEN type = 'LONG_WEEKENDS' AND (out_date BETWEEN start AND end)
THEN true
ELSE false
END out_is_longweekends,
CASE WHEN type = 'HOLIDAY' AND (in_date BETWEEN start AND end)
THEN true
ELSE false
END in_is_holiday,
CASE WHEN type = 'LONG_WEEKENDS' AND (in_date BETWEEN start AND end)
THEN true
ELSE false
END in_is_longweekends
FROM flights f
CROSS JOIN holidays h
)
SELECT
f.*,
CASE WHEN array_contains(collect_set(out_is_holiday), true)
THEN true
ELSE false
END out_is_holiday,
CASE WHEN array_contains(collect_set(out_is_longweekends), true)
THEN true
ELSE false
END out_is_longweekends,
CASE WHEN array_contains(collect_set(in_is_holiday), true)
THEN true
ELSE false
END in_is_holiday,
CASE WHEN array_contains(collect_set(in_is_longweekends), true)
THEN true
ELSE false
END in_is_longweekends
FROM t f
GROUP BY
f.querydatetime,
f.outboundlegid,
f.inboundlegid,
f.agent
LIMIT 1000000
但是得到了
pyspark.sql.utils.AnalysisException: u"expression 'f.`out_is_holiday`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;\nGlobalLimit 1000000\n+- LocalLimit 1000000\n +- Aggregate [querydatetime#231, outboundlegid#208, inboundlegid#209, agent#205], [outboundlegid#208, inboundlegid#209, agent#205, querydatetime#231, out_is_holiday#347, out_is_longweekends#348, in_is_holiday#349, in_is_longweekends#350, CASE WHEN array_contains(collect_set(out_is_holiday#347, 0, 0), true) THEN true ELSE false END AS out_is_holiday#343, CASE WHEN array_contains(collect_set(out_is_longweekends#348, 0, 0), true) THEN true ELSE false END AS out_is_longweekends#344, CASE WHEN array_contains(collect_set(in_is_holiday#349, 0, 0), true) THEN true ELSE false END AS in_is_holiday#345, CASE WHEN array_contains(collect_set(in_is_longweekends#350, 0, 0), true) THEN true ELSE false END AS in_is_longweekends#346]\n +- SubqueryAlias f\n +- SubqueryAlias t\n +- Project [outboundlegid#208, inboundlegid#209, agent#205, querydatetime#231, CASE WHEN ((type#57 = HOLIDAY) && ((out_date#267 >= start#55) && (out_date#267 <= end#56))) THEN true ELSE false END AS out_is_holiday#347, CASE WHEN ((type#57 = LONG_WEEKENDS) && ((out_date#267 >= start#55) && (out_date#267 <= end#56))) THEN true ELSE false END AS out_is_longweekends#348, CASE WHEN ((type#57 = HOLIDAY) && ((in_date#304 >= start#55) && (in_date#304 <= end#56))) THEN true ELSE false END AS in_is_holiday#349, CASE WHEN ((type#57 = LONG_WEEKENDS) && ((in_date#304 >= start#55) && (in_date#304 <= end#56))) THEN true ELSE false END AS in_is_longweekends#350]\n +- Join Cross\n :- SubqueryAlias f\n : +- SubqueryAlias flights\n : +- Project [Id#198, QueryTaskId#199, QueryOriginPlace#200, QueryOutboundDate#201, QueryInboundDate#202, QueryCabinClass#203, QueryCurrency#204, Agent#205, QuoteAgeInMinutes#206, Price#207, OutboundLegId#208, InboundLegId#209, OutDeparture#210, OutArrival#211, OutDuration#212, OutJourneyMode#213, OutStops#214, OutCarriers#215, OutOperatingCarriers#216, NumberOutStops#217, NumberOutCarriers#218, NumberOutOperatingCarriers#219, InDeparture#220, InArrival#221, ... 12 more fields]\n : +- Project [Id#198, QueryTaskId#199, QueryOriginPlace#200, QueryOutboundDate#201, QueryInboundDate#202, QueryCabinClass#203, QueryCurrency#204, Agent#205, QuoteAgeInMinutes#206, Price#207, OutboundLegId#208, InboundLegId#209, OutDeparture#210, OutArrival#211, OutDuration#212, OutJourneyMode#213, OutStops#214, OutCarriers#215, OutOperatingCarriers#216, NumberOutStops#217, NumberOutCarriers#218, NumberOutOperatingCarriers#219, InDeparture#220, InArrival#221, ... 11 more fields]\n : +- LogicalRDD [Id#198, QueryTaskId#199, QueryOriginPlace#200, QueryOutboundDate#201, QueryInboundDate#202, QueryCabinClass#203, QueryCurrency#204, Agent#205, QuoteAgeInMinutes#206, Price#207, OutboundLegId#208, InboundLegId#209, OutDeparture#210, OutArrival#211, OutDuration#212, OutJourneyMode#213, OutStops#214, OutCarriers#215, OutOperatingCarriers#216, NumberOutStops#217, NumberOutCarriers#218, NumberOutOperatingCarriers#219, InDeparture#220, InArrival#221, ... 10 more fields]\n +- SubqueryAlias h\n +- SubqueryAlias holidays\n +- LogicalRDD [start#55, end#56, type#57]\n"
尝试使用 foldLeft/reduce 函数来添加这 4 列来减少代码似乎有些矫枉过正。对我来说,它似乎通过构造 SQL 字符串而不是回退到数据帧操作来完成一些事情。看看这个
scala> val holiday = Seq(("2019-01-01","2019-01-01","HOLIDAY"),
| ("2019-02-05","2019-02-06","HOLIDAY"),
| ("2019-03-16","2019-03-24","HOLIDAY"),
| ("2019-04-19","2019-04-19","HOLIDAY"),
| ("2019-10-04","2019-10-04","HOLIDAY"),
| ("2019-08-08","2019-08-13","LONG_WEEKENDS"),
| ("2019-10-25","2019-10-29","LONG_WEEKENDS"),
| ("2019-12-20","2020-01-02","LONG_WEEKENDS")
| ).toDF("start","end","type")
holiday: org.apache.spark.sql.DataFrame = [start: string, end: string ... 1 more field]
scala> val flight = Seq(("25997661","2019-02-08","2019-02-12"),
| ("25997658","2019-02-05","2019-02-12"), // modified to get "true" values
| ("25997659","2019-02-08","2019-02-12"),
| ("25997662","2019-02-08","2019-02-12"),
| ("25997663","2019-02-08","2019-02-12"),
| ("25997657","2019-02-08","2019-02-12"),
| ("25997660","2019-02-08","2019-02-12"),
| ("25997397","2019-02-08","2019-02-12")
| ).toDF("id","out_date","in_date")
flight: org.apache.spark.sql.DataFrame = [id: string, out_date: string ... 1 more field]
scala> val df = flight.crossJoin(holiday).withColumn("out_date",to_date('out_date)).withColumn("in_date",to_date('in_date)).withColumn("start",to_date('start)).withColumn("endx",to_date('end)).withColumn("typex",'type)
df: org.apache.spark.sql.DataFrame = [id: string, out_date: date ... 6 more fields]
scala> df.createOrReplaceTempView("jiew")
scala> val od_holiday = """ case when out_date >= start and out_date <= endx and typex='HOLIDAY' then true else false end out_date_is_holiday """
od_holiday: String = " case when out_date >= start and out_date <= endx and typex='HOLIDAY' then true else false end out_date_is_holiday "
scala> val od_longweek = """ case when out_date >= start and out_date <= endx and typex='LONG_WEEKENDS' then true else false end out_date_is_longweekends """
od_longweek: String = " case when out_date >= start and out_date <= endx and typex='LONG_WEEKENDS' then true else false end out_date_is_longweekends "
scala> val id_holiday = """ case when in_date >= start and in_date <= endx and typex='HOLIDAY' then true else false end in_date_is_holiday """
id_holiday: String = " case when in_date >= start and in_date <= endx and typex='HOLIDAY' then true else false end in_date_is_holiday "
scala> val id_longweek = """ case when in_date >= start and in_date <= endx and typex='LONG_WEEKENDS' then true else false end in_date_is_longweekends """
id_longweek: String = " case when in_date >= start and in_date <= endx and typex='LONG_WEEKENDS' then true else false end in_date_is_longweekends "
scala> val sel_columns = Array(od_holiday,od_longweek,id_holiday,id_longweek).mkString(",")
sel_columns: String = " case when out_date >= start and out_date <= endx and typex='HOLIDAY' then true else false end out_date_is_holiday , case when out_date >= start and out_date <= endx and typex='LONG_WEEKENDS' then true else false end out_date_is_longweekends , case when in_date >= start and in_date <= endx and typex='HOLIDAY' then true else false end in_date_is_holiday , case when in_date >= start and in_date <= endx and typex='LONG_WEEKENDS' then true else false end in_date_is_longweekends "
scala> val new_columns = Array("out_date_is_holiday","out_date_is_longweekends","in_date_is_holiday","in_date_is_longweekends")
new_columns: Array[String] = Array(out_date_is_holiday, out_date_is_longweekends, in_date_is_holiday, in_date_is_longweekends)
scala> val group_sel_columns = new_columns.map( x => s"case when array_contains(collect_set("+x+"),true) then true else false end "+x )
group_sel_columns: Array[String] = Array(case when array_contains(collect_set(out_date_is_holiday),true) then true else false end out_date_is_holiday, case when array_contains(collect_set(out_date_is_longweekends),true) then true else false end out_date_is_longweekends, case when array_contains(collect_set(in_date_is_holiday),true) then true else false end in_date_is_holiday, case when array_contains(collect_set(in_date_is_longweekends),true) then true else false end in_date_is_longweekends)
scala> val group_sel_columns_str = group_sel_columns.mkString(",")
group_sel_columns_str: String = case when array_contains(collect_set(out_date_is_holiday),true) then true else false end out_date_is_holiday,case when array_contains(collect_set(out_date_is_longweekends),true) then true else false end out_date_is_longweekends,case when array_contains(collect_set(in_date_is_holiday),true) then true else false end in_date_is_holiday,case when array_contains(collect_set(in_date_is_longweekends),true) then true else false end in_date_is_longweekends
scala> spark.sql( s""" with t1 ( select t.*, ${sel_columns} from jiew t) select id,out_date,in_date, ${group_sel_columns_str} from t1 group by id,out_date,in_date """).show(false)
+--------+----------+----------+-------------------+------------------------+------------------+-----------------------+
|id |out_date |in_date |out_date_is_holiday|out_date_is_longweekends|in_date_is_holiday|in_date_is_longweekends|
+--------+----------+----------+-------------------+------------------------+------------------+-----------------------+
|25997663|2019-02-08|2019-02-12|false |false |false |false |
|25997657|2019-02-08|2019-02-12|false |false |false |false |
|25997662|2019-02-08|2019-02-12|false |false |false |false |
|25997397|2019-02-08|2019-02-12|false |false |false |false |
|25997660|2019-02-08|2019-02-12|false |false |false |false |
|25997659|2019-02-08|2019-02-12|false |false |false |false |
|25997661|2019-02-08|2019-02-12|false |false |false |false |
|25997658|2019-02-05|2019-02-12|true |false |false |false |
+--------+----------+----------+-------------------+------------------------+------------------+-----------------------+
scala> val df2 = spark.sql( s""" with t1 ( select t.*, ${sel_columns} from jiew t) select id,out_date,in_date, ${group_sel_columns_str} from t1 group by id,out_date,in_date """)
df2: org.apache.spark.sql.DataFrame = [id: string, out_date: date ... 5 more fields]
scala>
我有假期table
- 开始:日期
- 结束:日期
- 类型:枚举(假期|LONG_WEEKENDS)
一些示例数据:
"start","end","type"
"2019-01-01","2019-01-01","HOLIDAY"
"2019-02-05","2019-02-06","HOLIDAY"
"2019-03-16","2019-03-24","HOLIDAY"
"2019-04-19","2019-04-19","HOLIDAY"
"2019-10-04","2019-10-04","HOLIDAY"
"2019-08-08","2019-08-13","LONG_WEEKENDS"
"2019-10-25","2019-10-29","LONG_WEEKENDS"
"2019-12-20","2020-01-02","LONG_WEEKENDS"
还有一个航班table,为简单起见,它有
- id: varchar
- out_date: 日期
- in_date: 日期
一些示例数据:
"id","out_date","in_date"
"25997661","2019-02-08","2019-02-12"
"25997658","2019-02-08","2019-02-12"
"25997659","2019-02-08","2019-02-12"
"25997662","2019-02-08","2019-02-12"
"25997663","2019-02-08","2019-02-12"
"25997657","2019-02-08","2019-02-12"
"25997660","2019-02-08","2019-02-12"
"25997397","2019-02-08","2019-02-12"
我想在航班 table 中添加 4 列,例如:
- out_date_is_holiday: 布尔值
- out_date_is_longweekends:布尔值
- in_date_is_holiday:布尔值
- in_date_is_longweekends:布尔值
所以 "stupid" 方式是我下载假期 table。然后对于每次飞行,我会做(在 pyspark 中):
- 添加列
out_date_is_holiday if out_date is between holidays.start and holidays.end AND holidays.type = 'HOLIDAYS' then true else false
其他 3 列也是如此。我怎样才能有效地做到这一点?如果重要的话,我正在 AWS Glue 上执行此操作
更新
根据@stack0114106 的建议,我尝试了:
WITH t (
SELECT
f.outboundlegid,
f.inboundlegid,
f.agent,
f.querydatetime,
CASE WHEN type = 'HOLIDAY' AND (out_date BETWEEN start AND end)
THEN true
ELSE false
END out_is_holiday,
CASE WHEN type = 'LONG_WEEKENDS' AND (out_date BETWEEN start AND end)
THEN true
ELSE false
END out_is_longweekends,
CASE WHEN type = 'HOLIDAY' AND (in_date BETWEEN start AND end)
THEN true
ELSE false
END in_is_holiday,
CASE WHEN type = 'LONG_WEEKENDS' AND (in_date BETWEEN start AND end)
THEN true
ELSE false
END in_is_longweekends
FROM flights f
CROSS JOIN holidays h
)
SELECT
f.*,
CASE WHEN array_contains(collect_set(out_is_holiday), true)
THEN true
ELSE false
END out_is_holiday,
CASE WHEN array_contains(collect_set(out_is_longweekends), true)
THEN true
ELSE false
END out_is_longweekends,
CASE WHEN array_contains(collect_set(in_is_holiday), true)
THEN true
ELSE false
END in_is_holiday,
CASE WHEN array_contains(collect_set(in_is_longweekends), true)
THEN true
ELSE false
END in_is_longweekends
FROM t f
GROUP BY
f.querydatetime,
f.outboundlegid,
f.inboundlegid,
f.agent
LIMIT 1000000
但是得到了
pyspark.sql.utils.AnalysisException: u"expression 'f.`out_is_holiday`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;\nGlobalLimit 1000000\n+- LocalLimit 1000000\n +- Aggregate [querydatetime#231, outboundlegid#208, inboundlegid#209, agent#205], [outboundlegid#208, inboundlegid#209, agent#205, querydatetime#231, out_is_holiday#347, out_is_longweekends#348, in_is_holiday#349, in_is_longweekends#350, CASE WHEN array_contains(collect_set(out_is_holiday#347, 0, 0), true) THEN true ELSE false END AS out_is_holiday#343, CASE WHEN array_contains(collect_set(out_is_longweekends#348, 0, 0), true) THEN true ELSE false END AS out_is_longweekends#344, CASE WHEN array_contains(collect_set(in_is_holiday#349, 0, 0), true) THEN true ELSE false END AS in_is_holiday#345, CASE WHEN array_contains(collect_set(in_is_longweekends#350, 0, 0), true) THEN true ELSE false END AS in_is_longweekends#346]\n +- SubqueryAlias f\n +- SubqueryAlias t\n +- Project [outboundlegid#208, inboundlegid#209, agent#205, querydatetime#231, CASE WHEN ((type#57 = HOLIDAY) && ((out_date#267 >= start#55) && (out_date#267 <= end#56))) THEN true ELSE false END AS out_is_holiday#347, CASE WHEN ((type#57 = LONG_WEEKENDS) && ((out_date#267 >= start#55) && (out_date#267 <= end#56))) THEN true ELSE false END AS out_is_longweekends#348, CASE WHEN ((type#57 = HOLIDAY) && ((in_date#304 >= start#55) && (in_date#304 <= end#56))) THEN true ELSE false END AS in_is_holiday#349, CASE WHEN ((type#57 = LONG_WEEKENDS) && ((in_date#304 >= start#55) && (in_date#304 <= end#56))) THEN true ELSE false END AS in_is_longweekends#350]\n +- Join Cross\n :- SubqueryAlias f\n : +- SubqueryAlias flights\n : +- Project [Id#198, QueryTaskId#199, QueryOriginPlace#200, QueryOutboundDate#201, QueryInboundDate#202, QueryCabinClass#203, QueryCurrency#204, Agent#205, QuoteAgeInMinutes#206, Price#207, OutboundLegId#208, InboundLegId#209, OutDeparture#210, OutArrival#211, OutDuration#212, OutJourneyMode#213, OutStops#214, OutCarriers#215, OutOperatingCarriers#216, NumberOutStops#217, NumberOutCarriers#218, NumberOutOperatingCarriers#219, InDeparture#220, InArrival#221, ... 12 more fields]\n : +- Project [Id#198, QueryTaskId#199, QueryOriginPlace#200, QueryOutboundDate#201, QueryInboundDate#202, QueryCabinClass#203, QueryCurrency#204, Agent#205, QuoteAgeInMinutes#206, Price#207, OutboundLegId#208, InboundLegId#209, OutDeparture#210, OutArrival#211, OutDuration#212, OutJourneyMode#213, OutStops#214, OutCarriers#215, OutOperatingCarriers#216, NumberOutStops#217, NumberOutCarriers#218, NumberOutOperatingCarriers#219, InDeparture#220, InArrival#221, ... 11 more fields]\n : +- LogicalRDD [Id#198, QueryTaskId#199, QueryOriginPlace#200, QueryOutboundDate#201, QueryInboundDate#202, QueryCabinClass#203, QueryCurrency#204, Agent#205, QuoteAgeInMinutes#206, Price#207, OutboundLegId#208, InboundLegId#209, OutDeparture#210, OutArrival#211, OutDuration#212, OutJourneyMode#213, OutStops#214, OutCarriers#215, OutOperatingCarriers#216, NumberOutStops#217, NumberOutCarriers#218, NumberOutOperatingCarriers#219, InDeparture#220, InArrival#221, ... 10 more fields]\n +- SubqueryAlias h\n +- SubqueryAlias holidays\n +- LogicalRDD [start#55, end#56, type#57]\n"
尝试使用 foldLeft/reduce 函数来添加这 4 列来减少代码似乎有些矫枉过正。对我来说,它似乎通过构造 SQL 字符串而不是回退到数据帧操作来完成一些事情。看看这个
scala> val holiday = Seq(("2019-01-01","2019-01-01","HOLIDAY"),
| ("2019-02-05","2019-02-06","HOLIDAY"),
| ("2019-03-16","2019-03-24","HOLIDAY"),
| ("2019-04-19","2019-04-19","HOLIDAY"),
| ("2019-10-04","2019-10-04","HOLIDAY"),
| ("2019-08-08","2019-08-13","LONG_WEEKENDS"),
| ("2019-10-25","2019-10-29","LONG_WEEKENDS"),
| ("2019-12-20","2020-01-02","LONG_WEEKENDS")
| ).toDF("start","end","type")
holiday: org.apache.spark.sql.DataFrame = [start: string, end: string ... 1 more field]
scala> val flight = Seq(("25997661","2019-02-08","2019-02-12"),
| ("25997658","2019-02-05","2019-02-12"), // modified to get "true" values
| ("25997659","2019-02-08","2019-02-12"),
| ("25997662","2019-02-08","2019-02-12"),
| ("25997663","2019-02-08","2019-02-12"),
| ("25997657","2019-02-08","2019-02-12"),
| ("25997660","2019-02-08","2019-02-12"),
| ("25997397","2019-02-08","2019-02-12")
| ).toDF("id","out_date","in_date")
flight: org.apache.spark.sql.DataFrame = [id: string, out_date: string ... 1 more field]
scala> val df = flight.crossJoin(holiday).withColumn("out_date",to_date('out_date)).withColumn("in_date",to_date('in_date)).withColumn("start",to_date('start)).withColumn("endx",to_date('end)).withColumn("typex",'type)
df: org.apache.spark.sql.DataFrame = [id: string, out_date: date ... 6 more fields]
scala> df.createOrReplaceTempView("jiew")
scala> val od_holiday = """ case when out_date >= start and out_date <= endx and typex='HOLIDAY' then true else false end out_date_is_holiday """
od_holiday: String = " case when out_date >= start and out_date <= endx and typex='HOLIDAY' then true else false end out_date_is_holiday "
scala> val od_longweek = """ case when out_date >= start and out_date <= endx and typex='LONG_WEEKENDS' then true else false end out_date_is_longweekends """
od_longweek: String = " case when out_date >= start and out_date <= endx and typex='LONG_WEEKENDS' then true else false end out_date_is_longweekends "
scala> val id_holiday = """ case when in_date >= start and in_date <= endx and typex='HOLIDAY' then true else false end in_date_is_holiday """
id_holiday: String = " case when in_date >= start and in_date <= endx and typex='HOLIDAY' then true else false end in_date_is_holiday "
scala> val id_longweek = """ case when in_date >= start and in_date <= endx and typex='LONG_WEEKENDS' then true else false end in_date_is_longweekends """
id_longweek: String = " case when in_date >= start and in_date <= endx and typex='LONG_WEEKENDS' then true else false end in_date_is_longweekends "
scala> val sel_columns = Array(od_holiday,od_longweek,id_holiday,id_longweek).mkString(",")
sel_columns: String = " case when out_date >= start and out_date <= endx and typex='HOLIDAY' then true else false end out_date_is_holiday , case when out_date >= start and out_date <= endx and typex='LONG_WEEKENDS' then true else false end out_date_is_longweekends , case when in_date >= start and in_date <= endx and typex='HOLIDAY' then true else false end in_date_is_holiday , case when in_date >= start and in_date <= endx and typex='LONG_WEEKENDS' then true else false end in_date_is_longweekends "
scala> val new_columns = Array("out_date_is_holiday","out_date_is_longweekends","in_date_is_holiday","in_date_is_longweekends")
new_columns: Array[String] = Array(out_date_is_holiday, out_date_is_longweekends, in_date_is_holiday, in_date_is_longweekends)
scala> val group_sel_columns = new_columns.map( x => s"case when array_contains(collect_set("+x+"),true) then true else false end "+x )
group_sel_columns: Array[String] = Array(case when array_contains(collect_set(out_date_is_holiday),true) then true else false end out_date_is_holiday, case when array_contains(collect_set(out_date_is_longweekends),true) then true else false end out_date_is_longweekends, case when array_contains(collect_set(in_date_is_holiday),true) then true else false end in_date_is_holiday, case when array_contains(collect_set(in_date_is_longweekends),true) then true else false end in_date_is_longweekends)
scala> val group_sel_columns_str = group_sel_columns.mkString(",")
group_sel_columns_str: String = case when array_contains(collect_set(out_date_is_holiday),true) then true else false end out_date_is_holiday,case when array_contains(collect_set(out_date_is_longweekends),true) then true else false end out_date_is_longweekends,case when array_contains(collect_set(in_date_is_holiday),true) then true else false end in_date_is_holiday,case when array_contains(collect_set(in_date_is_longweekends),true) then true else false end in_date_is_longweekends
scala> spark.sql( s""" with t1 ( select t.*, ${sel_columns} from jiew t) select id,out_date,in_date, ${group_sel_columns_str} from t1 group by id,out_date,in_date """).show(false)
+--------+----------+----------+-------------------+------------------------+------------------+-----------------------+
|id |out_date |in_date |out_date_is_holiday|out_date_is_longweekends|in_date_is_holiday|in_date_is_longweekends|
+--------+----------+----------+-------------------+------------------------+------------------+-----------------------+
|25997663|2019-02-08|2019-02-12|false |false |false |false |
|25997657|2019-02-08|2019-02-12|false |false |false |false |
|25997662|2019-02-08|2019-02-12|false |false |false |false |
|25997397|2019-02-08|2019-02-12|false |false |false |false |
|25997660|2019-02-08|2019-02-12|false |false |false |false |
|25997659|2019-02-08|2019-02-12|false |false |false |false |
|25997661|2019-02-08|2019-02-12|false |false |false |false |
|25997658|2019-02-05|2019-02-12|true |false |false |false |
+--------+----------+----------+-------------------+------------------------+------------------+-----------------------+
scala> val df2 = spark.sql( s""" with t1 ( select t.*, ${sel_columns} from jiew t) select id,out_date,in_date, ${group_sel_columns_str} from t1 group by id,out_date,in_date """)
df2: org.apache.spark.sql.DataFrame = [id: string, out_date: date ... 5 more fields]
scala>