PySpark 超时尝试 repartition/write 镶木地板(期货在 [300 秒] 后超时)?

PySpark timeout trying to repartition/write to parquet (Futures timed out after [300 seconds])?

我是 运行 PySpark(在 AWS Glue 上,如果重要的话)。我收到超时错误:(似乎无法写入镶木地板)

完整日志位于 https://pastebin.com/TmuAcFx7

File "script_2019-02-06-02-32-43.py", line 197, in <module>
.parquet("s3://xxx-glue/cleanedFlights")
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o246.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:213)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(countryName#24, querydatetime#213, 200)
+- *Project [master_key#588, master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 33 more fields]
+- BatchEvalPython [getInterval(cast(date_format(outdeparture#416, H, Some(Zulu)) as int), 0, 24, 4), getInterval(cast(date_format(indeparture#498, H, Some(Zulu)) as int), 0, 24, 4)], [master_key#588, master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 21 more fields]
+- *Sort [key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST, 200)
+- *Project [key#250 AS master_key#588, querydatetime#101 AS master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 19 more fields]
+- *BroadcastHashJoin [key#250, querydatetime#101], [key#590, querydatetime#213], LeftOuter, BuildRight
:- *Project [key#250, querydatetime#101]
: +- BroadcastNestedLoopJoin BuildRight, Cross
: :- Generate explode(pythonUDF0#1633), false, false, [querydatetime#101]
: : +- BatchEvalPython [generate_date_series(start#94, stop#95)], [start#94, stop#95, pythonUDF0#1633]
: : +- Scan ExistingRDD[start#94,stop#95]
: +- BroadcastExchange IdentityBroadcastMode
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *Sample 0.0, 0.001, false, 7736333241016522154
: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000
: +- *Project [concat(outboundlegid#190, -, inboundlegid#191, -, agent#187) AS key#250]
: +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
: :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(querydestinationplace#212, 200)
: : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : +- *SortMergeJoin [agent#187], [id#89], Inner
: : :- *Sort [agent#187 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(agent#187, 200)
: : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : : +- *Filter (isnotnull(agent#187) && isnotnull(querydestinationplace#212))
: : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: : +- *Sort [id#89 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#89, 200)
: : +- *Project [cast(id#67L as string) AS id#89]
: : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
: +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(airportId#38 as int), 200)
: +- *Project [cast(airportId#18L as string) AS airportId#38]
: +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
: +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[34, string, true], input[33, date, true]))
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, cast(QueryOutboundDate#183 as date) AS queryoutbounddate#334, cast(QueryInboundDate#184 as date) AS queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, cast(unix_timestamp(OutDeparture#192, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outdeparture#416, cast(unix_timestamp(OutArrival#193, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, cast(unix_timestamp(InDeparture#202, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS indeparture#498, cast(unix_timestamp(InArrival#203, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS inarrival#539, ... 15 more fields]
+- *Sample 0.0, 0.001, false, 7736333241016522154
+- *GlobalLimit 5000000
+- Exchange SinglePartition
+- *LocalLimit 5000000
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 15 more fields]
+- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
:- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(querydestinationplace#212, 200)
: +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 11 more fields]
: +- *SortMergeJoin [Agent#187], [id#89], Inner
: :- *Sort [Agent#187 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(Agent#187, 200)
: : +- *Filter (isnotnull(Agent#187) && isnotnull(querydestinationplace#212))
: : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: +- *Sort [id#89 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#89, 200)
: +- *Project [cast(id#67L as string) AS id#89, name#68]
: +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
+- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(airportId#38 as int), 200)
+- *Project [cast(airportId#18L as string) AS airportId#38, countryName#24, cityName#23, airportName#22]
+- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
+- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
... 45 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST, 200)
+- *Project [key#250 AS master_key#588, querydatetime#101 AS master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 19 more fields]
+- *BroadcastHashJoin [key#250, querydatetime#101], [key#590, querydatetime#213], LeftOuter, BuildRight
:- *Project [key#250, querydatetime#101]
: +- BroadcastNestedLoopJoin BuildRight, Cross
: :- Generate explode(pythonUDF0#1633), false, false, [querydatetime#101]
: : +- BatchEvalPython [generate_date_series(start#94, stop#95)], [start#94, stop#95, pythonUDF0#1633]
: : +- Scan ExistingRDD[start#94,stop#95]
: +- BroadcastExchange IdentityBroadcastMode
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *Sample 0.0, 0.001, false, 7736333241016522154
: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000
: +- *Project [concat(outboundlegid#190, -, inboundlegid#191, -, agent#187) AS key#250]
: +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
: :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(querydestinationplace#212, 200)
: : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : +- *SortMergeJoin [agent#187], [id#89], Inner
: : :- *Sort [agent#187 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(agent#187, 200)
: : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : : +- *Filter (isnotnull(agent#187) && isnotnull(querydestinationplace#212))
: : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: : +- *Sort [id#89 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#89, 200)
: : +- *Project [cast(id#67L as string) AS id#89]
: : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
: +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(airportId#38 as int), 200)
: +- *Project [cast(airportId#18L as string) AS airportId#38]
: +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
: +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[34, string, true], input[33, date, true]))
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, cast(QueryOutboundDate#183 as date) AS queryoutbounddate#334, cast(QueryInboundDate#184 as date) AS queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, cast(unix_timestamp(OutDeparture#192, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outdeparture#416, cast(unix_timestamp(OutArrival#193, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, cast(unix_timestamp(InDeparture#202, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS indeparture#498, cast(unix_timestamp(InArrival#203, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS inarrival#539, ... 15 more fields]
+- *Sample 0.0, 0.001, false, 7736333241016522154
+- *GlobalLimit 5000000
+- Exchange SinglePartition
+- *LocalLimit 5000000
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 15 more fields]
+- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
:- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(querydestinationplace#212, 200)
: +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 11 more fields]
: +- *SortMergeJoin [Agent#187], [id#89], Inner
: :- *Sort [Agent#187 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(Agent#187, 200)
: : +- *Filter (isnotnull(Agent#187) && isnotnull(querydestinationplace#212))
: : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: +- *Sort [id#89 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#89, 200)
: +- *Project [cast(id#67L as string) AS id#89, name#68]
: +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
+- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(airportId#38 as int), 200)
+- *Project [cast(airportId#18L as string) AS airportId#38, countryName#24, cityName#23, airportName#22]
+- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
+- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
... 60 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
at 

一些谷歌搜索表明它因广播超时而失败?

我的代码如下所示。我认为它在最后一部分附近失败了,写入镶木地板?但是解释日志表明它也执行查询?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import UserDefinedFunction, udf, regexp_replace, to_timestamp, date_format, lit
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, StringType, DateType, Row
import math

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# READ IN FLIGHTS, AIRPORTS, AGENTS TABLES
# NOTE: Bookmarks enabled for flights data catalog
airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "airports")
airportsDF = airportsGDF.toDF().select("airportId", "countryName", "cityName", "airportName")
airportsDF = airportsDF.withColumn("airportId", airportsDF["airportId"].cast("string"))
airportsDF.createOrReplaceTempView("airports")

agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "agents")
agentsRawDF = agentsGDF.toDF().select("id", "name")
agentsRawDF = agentsRawDF.withColumn("id", agentsRawDF["id"].cast("string"))
agentsRawDF.createOrReplaceTempView("agents")

def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

arr = [13301,12929,14511,9968,15280,10193,13531,13439,16122,9498,16162,17210,12728,14534,12542,13303,16716,13311,12913,11036,17471,16240,10902,15526,17294,15671,10858,17482,12071,12337,17521,12274,10032,17396,11052,9970,12917,12195,10658,17409,13078,17416,17388,12118,10438,13113,11170,14213,9762,10871,11780,12392,15518,13536,10724,14260,16747,18490,17402,10284,10982,10431,16743,12482,10497,15168,16587,15412,17106,11017,17368,13804,15461,19461,16923,9794,12795,25396,12952,15422,10101,14147,10485,12210,25336,9449,15395,13947,11893,11109,9921,9799,15253,16945,13164,10031,17002,17152,16516,13180,16451,16437,11336,13428,10182,25405,16955,10180,12191]

def generate_date_series(start, stop):
    return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)]    

spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))

def getInterval(num, start, stop, incr): 
    if (num is None):
        return ""

    lower = math.floor(num / incr) * incr
    upper = lower + incr
    return "(%d,%d]" % (lower, upper) 

spark.udf.register("getInterval", getInterval, StringType())
getIntervalUdf = udf(getInterval)

# CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS
today = datetime.utcnow().date()
start = today - timedelta(days = 14) # TODO: CHANGE TO 90
sevenDaysAgo = today - timedelta(days = 7)
print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=sevenDaysAgo)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
    querydatetime BETWEEN '%s' AND '%s' 
    AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))

flightsDf = flightsGDF.toDF()
flightsDf.createOrReplaceTempView("flights")
print("===LOG:STARTING_QUERY===")

resultDf = spark.sql("""
    SELECT 
        f.*, 
        CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
        countryName, cityName, airportName, a.name AS agentName
    FROM flights f
    INNER JOIN agents a
    ON f.agent = a.id
    INNER JOIN airports p
    ON f.querydestinationplace = p.airportId
    LIMIT 5000000
""") \
    .sample(False, 0.001)
resultDf.explain(True)

print("===LOG:ADDING_COLUMNS===")
resultDf = resultDf \
    .withColumn("querydatetime", resultDf["querydatetime"].cast("date")) \
    .withColumn("queryoutbounddate", resultDf["queryoutbounddate"].cast("date")) \
    .withColumn("queryinbounddate", resultDf["queryinbounddate"].cast("date")) \
    .withColumn("outdeparture", to_timestamp(resultDf["outdeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("outarrival", to_timestamp(resultDf["outarrival"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("indeparture", to_timestamp(resultDf["indeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("inarrival", to_timestamp(resultDf["inarrival"], "yyyy-MM-dd'T'HH:mm:ss"))

print("===LOG:WRITING_RAW===")
print("===LOG:DONE_WRITING_RAW===")
resultDf.createOrReplaceTempView("flights")

# GET DISTINCT DATASET
# distinctKeysDf = resultDf.select("outboundlegid", "inboundlegid", "agent").groupBy(["outboundlegid", "inboundlegid", "agent"])
distinctKeysDf = spark.sql("""
    SELECT key
    FROM flights
    GROUP BY key
""")
distinctKeysDf.createOrReplaceTempView("distinctKeys")

# GET RELAVENT DATES DATASET
print("===LOG:WRITING_EXPANDED===")
expandedKeyDatesDf = spark.sql("""
    SELECT key, querydatetime
    FROM relaventDates
    CROSS JOIN distinctKeys
""")

expandedKeyDatesDf.createOrReplaceTempView("expandedKeyDates")
print("===LOG:DONE_WRITING_EXPANDED===")

cleanedFlightsDf = spark.sql("""
    SELECT 
        e.key AS master_key, 
        e.querydatetime AS master_querydatetime, 
        f.*
    FROM expandedKeyDates e
    LEFT JOIN flights f
    ON e.key = f.key
    AND e.querydatetime = f.querydatetime
    ORDER BY e.key, e.querydatetime
""")
cleanedFlightsDf = cleanedFlightsDf \
    .withColumn("created_day", date_format(cleanedFlightsDf["querydatetime"], "EEEE")) \
    .withColumn("created_month", date_format(cleanedFlightsDf["querydatetime"], "yyyy-MM")) \
    .withColumn("created_month_m", date_format(cleanedFlightsDf["querydatetime"], "M").cast("int")) \
    .withColumn("created_week", date_format(cleanedFlightsDf["querydatetime"], "w").cast("int")) \
    .withColumn("out_day", date_format(cleanedFlightsDf["outdeparture"], "EEE")) \
    .withColumn("out_month", date_format(cleanedFlightsDf["outdeparture"], "yyyy-MM")) \
    .withColumn("out_month_m", date_format(cleanedFlightsDf["outdeparture"], "M").cast("int")) \
    .withColumn("out_week", date_format(cleanedFlightsDf["outdeparture"], "w").cast("int")) \
    .withColumn("out_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["outdeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \
    .withColumn("out_hour", date_format(cleanedFlightsDf["outdeparture"], "k").cast("int")) \
    .withColumn("in_day", date_format(cleanedFlightsDf["indeparture"], "EEE")) \
    .withColumn("in_month", date_format(cleanedFlightsDf["indeparture"], "yyyy-MM")) \
    .withColumn("in_month_m", date_format(cleanedFlightsDf["indeparture"], "M").cast("int")) \
    .withColumn("in_week", date_format(cleanedFlightsDf["indeparture"], "w").cast("int")) \
    .withColumn("in_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["indeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \
    .withColumn("in_hour", date_format(cleanedFlightsDf["indeparture"], "k").cast("int"))

print("===LOG:WRITING_CLEANED===")
cleanedFlightsDf \
    .repartition("countryName", "querydatetime") \
    .write \
    .mode("overwrite") \
    .partitionBy(["countryName", "querydatetime"]) \
    .parquet("s3://xxx-glue/cleanedFlights")
print("===LOG:DONE_WRITING_CLEANED===")

print("===LOG:DONE BATCH %s" % (batch))

job.commit()

使用glueContext.write_from_options()写入数据

您的代码最薄弱的地方如下:

LIMIT 5000000

如果你仔细看一下执行计划

: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000

您会看到该实现使用两步过程,其中部分限制被收集到单个分区。这么大的数字(LIMIT根本就没有考虑到这种情况)你可以很容易地压倒相应的执行者。

此外,您的代码中的 LIMIT 是多余的,因为您在它后面加上了 .sample(False, 0.001)

我建议删除 LIMIT 子句,并相应地调整分数:

result_full = spark.sql("""
    SELECT 
        f.*, 
        CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
        countryName, cityName, airportName, a.name AS agentName
    FROM flights f
    INNER JOIN agents a
    ON f.agent = a.id
    INNER JOIN airports p
    ON f.querydestinationplace = p.airportId
""")

desired_size = (5000000 * 0.001)
fraction = desired_size / result_full .count()
assert 1 < fraction < 0  

result_sample = result_full.sample(False, fraction)

此外,我建议重写 generate_date_series

from pyspark.sql.functions import lit
from pyspark.sql import SparkSession


def generate_date_series(start, stop):
    span = (stop - start).days + 1
    return (SparkSession.builder.getOrCreate()
               .range(0, span)
               .withColumn("start", lit(start))
               .selectExpr("date_add(start, id) AS querydatetime"))


(generate_date_series(start, seven_days_ago)
    .createOrReplaceTempView("relaventDates"))

最后,我强烈建议用内置函数的组合替换 getInterval UDF*(未使用的参数按原样保留):

from pyspark.sql.functions import concat, floor
from pyspark.sql.functions import Column


def get_interval(num, start, stop, incr):
    assert isinstance(num, Column)

    lower = floor(num / incr).cast("integer") * incr
    upper = lower + incr
    return concat(lit("("), lower, lit(","), upper, lit(")"))

以后可以用作 UDF 的直接替代品,尽管它不太可能直接解决您当前的问题。

from pyspark.sql.functions import hour

...
    .withColumn(
        "out_departure_interval",
        get_interval(hour("outdeparture"), 0, 24, 4))

旁注 UDFRegistration.register returns 现在有几个版本的可调用对象,因此您可以替换

spark.udf.register("getInterval", getInterval, StringType())
getIntervalUdf = udf(getInterval)

getIntervalUdf = spark.udf.register("getInterval", getInterval, StringType())

* 你也可以考虑使用 dedicated window function:

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

它看起来像是一个超时的广播加入。默认超时为 300 秒,您可以将其增加到小时

spark.conf.set("spark.sql.broadcastTimeout", 7200)

现在,如果广播真的需要几个小时,将这么多数据发送到所有节点并不是一个好主意,您可能需要禁用广播连接

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

或微调代码以避免在查询规划器方面造成混淆。