为什么在使用 Range Join 提示时会出现异常?
Why am I getting an exception when using a Range Join hint?
我正在尝试使用 DataFrame.hint() method to add a Range Join hint 加入。
我有两个 table:minutes
和 events
。
分钟 table 有 minute_start
和 minute_end
列,它们是自固定时刻以来的秒数。当然,它们的值是60的倍数。
事件 table 具有类似的 event_start
和 event_end
列,仅针对事件。活动可以随时开始和结束。
对于每个事件,我需要找到它重叠的所有分钟数。
我正在 Databricks 上尝试这个(运行时 5.1,Python 3.5):
# from pyspark.sql.types import StructType, StructField, IntegerType
# minutes = spark.sparkContext\
# .parallelize(((0, 60),
# (60, 120)))\
# .toDF(StructType([
# StructField('minute_start', IntegerType()),
# StructField('minute_end', IntegerType())
# ]))
# events = spark.sparkContext\
# .parallelize(((12, 33),
# (0, 120),
# (33, 72),
# (65, 178)))\
# .toDF(StructType([
# StructField('event_start', IntegerType()),
# StructField('event_end', IntegerType())
# ]))
events.hint("range_join", "60")\
.join(minutes,
on=[events.event_start < minutes.minute_end,
minutes.minute_start < events.event_end])\
.orderBy(events.event_start,
events.event_end,
minutes.minute_start)\
.show()
没有 hint
调用,结果符合预期:
+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
| 0| 120| 0| 60|
| 0| 120| 60| 120|
| 12| 33| 0| 60|
| 33| 72| 0| 60|
| 33| 72| 60| 120|
| 65| 178| 60| 120|
+-----------+---------+------------+----------+
对于 hint
,我得到异常:
AnalysisException: 'Range join hint: invalid arguments Buffer(60);'
当我尝试在提示中将 60
作为数字而不是字符串传递时,它抱怨提示的参数必须是字符串。
我不在 Azure,但我希望结果是一样的。
有没有人遇到过类似的问题并找到了解决方案或者知道我哪里出错了?
更新 1
(目前,我正在 Databricks Runtime 6.1、Python 3.7.3、Spark 2.4.4 上试用)
我以为我错过了参数应该是可迭代的,所以我再次尝试 events.hint("range_join", [60])
。同样抱怨参数不是字符串:TypeError: all parameters should be str, got 60 of type <class 'int'>
.
我想知道 Databricks 的 Spark 版本是否落后。
这是在 Spark 中 source code on GitHub:
def hint(self, name, *parameters):
... (no checks on `parameters` up to here)
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... (no checks beyond this point)
所以应该允许 int
的列表。
我得到的是 all parameters should be str
,但是如果我传递了错误类型的参数,GitHub 版本会说 all parameters should be in (basestring, list, float, int)
。
更新 2
hint("skew", "col_name")
似乎有效。
我在 GitHub 上查看了 Spark 源代码。
Version 2.4.4 有这个:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
for p in parameters:
if not isinstance(p, str):
raise TypeError(
"all parameters should be str, got {0} of type {1}".format(p, type(p)))
... # no checks beyond here
但是从 version 3.0.0-preview-rc1 开始,来源是这样的:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... # no checks beyond here
看来 2.4.4 版本有一个错误,从 3.0.0-preview-rc1 开始的版本已经修复了。
我正在尝试使用 DataFrame.hint() method to add a Range Join hint 加入。
我有两个 table:minutes
和 events
。
分钟 table 有 minute_start
和 minute_end
列,它们是自固定时刻以来的秒数。当然,它们的值是60的倍数。
事件 table 具有类似的 event_start
和 event_end
列,仅针对事件。活动可以随时开始和结束。
对于每个事件,我需要找到它重叠的所有分钟数。
我正在 Databricks 上尝试这个(运行时 5.1,Python 3.5):
# from pyspark.sql.types import StructType, StructField, IntegerType
# minutes = spark.sparkContext\
# .parallelize(((0, 60),
# (60, 120)))\
# .toDF(StructType([
# StructField('minute_start', IntegerType()),
# StructField('minute_end', IntegerType())
# ]))
# events = spark.sparkContext\
# .parallelize(((12, 33),
# (0, 120),
# (33, 72),
# (65, 178)))\
# .toDF(StructType([
# StructField('event_start', IntegerType()),
# StructField('event_end', IntegerType())
# ]))
events.hint("range_join", "60")\
.join(minutes,
on=[events.event_start < minutes.minute_end,
minutes.minute_start < events.event_end])\
.orderBy(events.event_start,
events.event_end,
minutes.minute_start)\
.show()
没有 hint
调用,结果符合预期:
+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
| 0| 120| 0| 60|
| 0| 120| 60| 120|
| 12| 33| 0| 60|
| 33| 72| 0| 60|
| 33| 72| 60| 120|
| 65| 178| 60| 120|
+-----------+---------+------------+----------+
对于 hint
,我得到异常:
AnalysisException: 'Range join hint: invalid arguments Buffer(60);'
当我尝试在提示中将 60
作为数字而不是字符串传递时,它抱怨提示的参数必须是字符串。
我不在 Azure,但我希望结果是一样的。
有没有人遇到过类似的问题并找到了解决方案或者知道我哪里出错了?
更新 1
(目前,我正在 Databricks Runtime 6.1、Python 3.7.3、Spark 2.4.4 上试用)
我以为我错过了参数应该是可迭代的,所以我再次尝试 events.hint("range_join", [60])
。同样抱怨参数不是字符串:TypeError: all parameters should be str, got 60 of type <class 'int'>
.
我想知道 Databricks 的 Spark 版本是否落后。
这是在 Spark 中 source code on GitHub:
def hint(self, name, *parameters):
... (no checks on `parameters` up to here)
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... (no checks beyond this point)
所以应该允许 int
的列表。
我得到的是 all parameters should be str
,但是如果我传递了错误类型的参数,GitHub 版本会说 all parameters should be in (basestring, list, float, int)
。
更新 2
hint("skew", "col_name")
似乎有效。
我在 GitHub 上查看了 Spark 源代码。
Version 2.4.4 有这个:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
for p in parameters:
if not isinstance(p, str):
raise TypeError(
"all parameters should be str, got {0} of type {1}".format(p, type(p)))
... # no checks beyond here
但是从 version 3.0.0-preview-rc1 开始,来源是这样的:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... # no checks beyond here
看来 2.4.4 版本有一个错误,从 3.0.0-preview-rc1 开始的版本已经修复了。