Impala 从 Impala 迁移到 SparkSQL 时内置函数不可用
Impala built-in function not available when migrating from Impala to SparkSQL
我正在使用 Impala 中的内置函数,例如:
select id, parse_url(my_table.url, "QUERY", "extensionId") from my_table
现在我正在迁移到 SparkSQL(在 Jupyter Notebook 中使用 pyspark):
my_table.select(my_table.id.cast('string'), parse_url(my_table.url.cast('string'), "QUERY", "extensionId")).show()
但是,我收到以下错误:
NameError: name 'parse_url' is not defined
也试过以下:
my_table.registerTempTable("my_table")
sqlContext.sql("select id, url, parse_url(url, 'QUERY', 'extensionId') as new_url from my_table").show(100)
但是所有的new_url
都变成了null
。
知道我错过了什么吗?另外,人们将如何处理这样的问题?谢谢!
一些缺失的部分:
- 您无法使用 Spark 执行 Impala 函数。
- 有一个具有相同名称和语法的 Hive UDF 可以与 Spark 一起使用,但它没有本机实现和函数包装器。这就是为什么它可以通过 SQL 使用
HiveContext
/ SparkSession
和 Hive 支持来调用。
一般来说它应该工作得很好:
spark.sql("""SELECT parse_url(
'http://example.com?extensionId=foo', 'QUERY', 'extensionId'
)""").show()
+-----------------------------------------------------------------+
|parse_url(http://example.com?extensionId=foo, QUERY, extensionId)|
+-----------------------------------------------------------------+
| foo|
+-----------------------------------------------------------------+
和NULL
输出表示给定部分无法匹配:
spark.sql("""SELECT parse_url(
'http://example.com?bar=foo', 'QUERY', 'extensionId'
)""").show()
+---------------------------------------------------------+
|parse_url(http://example.com?bar=foo, QUERY, extensionId)|
+---------------------------------------------------------+
| null|
+---------------------------------------------------------+
您可以使用 UDF 获得类似的结果,但速度会慢得多。
from typing import Dict
from urllib.parse import parse_qsl, urlsplit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, MapType
def parse_args(col: str) -> Dict[str, str]:
"""
"""
try:
return dict(parse_qsl(urlsplit(col).query))
except:
pass
parse_args_ = udf(parse_args, MapType(StringType(), StringType()))
数据定义为:
df = sc.parallelize([
("http://example.com?bar=foo", ),
("http://example.com?extensionId=foo", ),
]).toDF(["url"])
可以这样使用:
df.select(parse_args_("url")["extensionId"]).show()
结果为:
+----------------------------+
|parse_args(url)[extensionId]|
+----------------------------+
| null|
| foo|
+----------------------------+
我正在使用 Impala 中的内置函数,例如:
select id, parse_url(my_table.url, "QUERY", "extensionId") from my_table
现在我正在迁移到 SparkSQL(在 Jupyter Notebook 中使用 pyspark):
my_table.select(my_table.id.cast('string'), parse_url(my_table.url.cast('string'), "QUERY", "extensionId")).show()
但是,我收到以下错误:
NameError: name 'parse_url' is not defined
也试过以下:
my_table.registerTempTable("my_table")
sqlContext.sql("select id, url, parse_url(url, 'QUERY', 'extensionId') as new_url from my_table").show(100)
但是所有的new_url
都变成了null
。
知道我错过了什么吗?另外,人们将如何处理这样的问题?谢谢!
一些缺失的部分:
- 您无法使用 Spark 执行 Impala 函数。
- 有一个具有相同名称和语法的 Hive UDF 可以与 Spark 一起使用,但它没有本机实现和函数包装器。这就是为什么它可以通过 SQL 使用
HiveContext
/SparkSession
和 Hive 支持来调用。
一般来说它应该工作得很好:
spark.sql("""SELECT parse_url(
'http://example.com?extensionId=foo', 'QUERY', 'extensionId'
)""").show()
+-----------------------------------------------------------------+
|parse_url(http://example.com?extensionId=foo, QUERY, extensionId)|
+-----------------------------------------------------------------+
| foo|
+-----------------------------------------------------------------+
和NULL
输出表示给定部分无法匹配:
spark.sql("""SELECT parse_url(
'http://example.com?bar=foo', 'QUERY', 'extensionId'
)""").show()
+---------------------------------------------------------+
|parse_url(http://example.com?bar=foo, QUERY, extensionId)|
+---------------------------------------------------------+
| null|
+---------------------------------------------------------+
您可以使用 UDF 获得类似的结果,但速度会慢得多。
from typing import Dict
from urllib.parse import parse_qsl, urlsplit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, MapType
def parse_args(col: str) -> Dict[str, str]:
"""
"""
try:
return dict(parse_qsl(urlsplit(col).query))
except:
pass
parse_args_ = udf(parse_args, MapType(StringType(), StringType()))
数据定义为:
df = sc.parallelize([
("http://example.com?bar=foo", ),
("http://example.com?extensionId=foo", ),
]).toDF(["url"])
可以这样使用:
df.select(parse_args_("url")["extensionId"]).show()
结果为:
+----------------------------+
|parse_args(url)[extensionId]|
+----------------------------+
| null|
| foo|
+----------------------------+