Impala 从 Impala 迁移到 SparkSQL 时内置函数不可用

Impala built-in function not available when migrating from Impala to SparkSQL

我正在使用 Impala 中的内置函数,例如:

select id, parse_url(my_table.url, "QUERY", "extensionId") from my_table

现在我正在迁移到 SparkSQL(在 Jupyter Notebook 中使用 pyspark):

my_table.select(my_table.id.cast('string'), parse_url(my_table.url.cast('string'), "QUERY", "extensionId")).show()

但是,我收到以下错误:

NameError: name 'parse_url' is not defined

也试过以下:

my_table.registerTempTable("my_table")

sqlContext.sql("select id, url, parse_url(url, 'QUERY', 'extensionId') as new_url from my_table").show(100)

但是所有的new_url都变成了null

知道我错过了什么吗?另外,人们将如何处理这样的问题?谢谢!

一些缺失的部分:

  • 您无法使用 Spark 执行 Impala 函数。
  • 有一个具有相同名称和语法的 Hive UDF 可以与 Spark 一起使用,但它没有本机实现和函数包装器。这就是为什么它可以通过 SQL 使用 HiveContext / SparkSession 和 Hive 支持来调用。

一般来说它应该工作得很好:

spark.sql("""SELECT parse_url(
    'http://example.com?extensionId=foo', 'QUERY', 'extensionId'
)""").show()
+-----------------------------------------------------------------+
|parse_url(http://example.com?extensionId=foo, QUERY, extensionId)|
+-----------------------------------------------------------------+
|                                                              foo|
+-----------------------------------------------------------------+

NULL输出表示给定部分无法匹配:

spark.sql("""SELECT parse_url(
    'http://example.com?bar=foo', 'QUERY', 'extensionId'
)""").show()
+---------------------------------------------------------+
|parse_url(http://example.com?bar=foo, QUERY, extensionId)|
+---------------------------------------------------------+
|                                                     null|
+---------------------------------------------------------+

您可以使用 UDF 获得类似的结果,但速度会慢得多。

from typing import Dict
from urllib.parse import parse_qsl, urlsplit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, MapType

def parse_args(col: str) -> Dict[str, str]:
    """
    
    """
    try:
        return dict(parse_qsl(urlsplit(col).query))
    except:
        pass

parse_args_ = udf(parse_args, MapType(StringType(), StringType()))

数据定义为:

df = sc.parallelize([
    ("http://example.com?bar=foo", ),
    ("http://example.com?extensionId=foo", ),
]).toDF(["url"])

可以这样使用:

df.select(parse_args_("url")["extensionId"]).show()

结果为:

+----------------------------+
|parse_args(url)[extensionId]|
+----------------------------+
|                        null|
|                         foo|
+----------------------------+