可以在纯 Spark SQL 中使用 Spark Pandas UDF 吗?

Possible to use Spark Pandas UDF in pure Spark SQL?

这个有效:

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
import pandas as pd

spark = SparkSession.builder.getOrCreate()

@pandas_udf(returnType="long")
def add_one(v: pd.Series) -> pd.Series:
    return v.add(1)

spark.udf.register("add_one", add_one)

spark.sql("select add_one(1)").show()

不过,我想知道 if/how 我可以做以下工作:

$ spark-sql -e 'select add_one(1)'

现在,如果可以使用它,那就太好了。

恐怕这目前是不可能的。有趣的是,实际上没有人提到它。

其实在apache spark文档中的小记中“隐藏”的信息:

Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.

正如您可能理解的含义,这意味着您不能从 CLI spark-sql 调用 UDFs。这是文档的 link

可以仔细检查 bin/spark-sql source code at github 实际做了什么:

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "[=10=]")"/find-spark-home
fi

export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"

这再次确认,因为它提交给 thriftserver,您不能在 spark-sql CLI 中使用 UDF

Pandas UDF 是向量化的 UDF,旨在避免 PySpark 中的逐行迭代。注册这些 UDF 后,它们的行为就像 PySpark 函数 API。他们将 运行 居住在 Python 名工人体内。

如@tukan 所述,Spark SQL CLI 无法与 JDBC 服务器通信。所以,Spark 本身并不支持这个。

但是,您可以进行自定义 RPC 调用以直接调用它,但这并不像您最初想要做的那样简单或相同。

目前无法以您想要的方式使用 Python UDF。但是该选项可用于 Scala/Java UDF,因此如果您愿意使用 Scala/Java,这是一种方法。注意:我将 HiveUDF 实现为 Spark supports HiveUDF.

您需要做的第一件事是使用以下示例结构创建一个 Java 项目:

root
| - pom.xml
| - src/main/com/test/udf/SimpleConcatUDF.java

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.test.udf</groupId>
  <artifactId>simple-concat-udf</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <properties>
    <hive.version>3.1.2</hive.version>
  </properties>

  <repositories>
    <repository>
      <id>hortonworks</id>
      <url>http://repo.hortonworks.com/content/groups/public</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <dependencies>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>${hive.version}</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>2.3.2</version>
          <configuration>
            <source>1.6</source>
            <target>1.6</target>
          </configuration>
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-eclipse-plugin</artifactId>
          <version>2.9</version>
          <configuration>
            <useProjectReferences>false</useProjectReferences>
          </configuration>
        </plugin>
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <appendAssemblyId>false</appendAssemblyId>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass>com.test.udf.SimpleConcatUDF</mainClass>
              </manifest>
            </archive>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

</project>

SimpleConcatUDF.java

package com.test.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class SimpleConcatUDF extends UDF {

  public String evaluate(final Text text) {
    return text.toString() + "_from_udf";
  }

}

您接下来要做的是编译和打包它。我正在使用 maven 所以标准命令是:

cd <project-root-path>/
mvn clean install
# output jar file is located at <project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar

最后,您需要使用 create function 进行注册。如果您将该功能注册为永久功能,则只需 一次 即可。否则,您可以将其注册为临时。

spark-sql> create function simple_concat AS 'com.test.udf.SimpleConcatUDF' using jar '<project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar';
spark-sql> show user functions;
default.simple_concat
Time taken: 1.868 seconds, Fetched 1 row(s)
spark-sql> select simple_concat('a');
a_from_udf
Time taken: 0.079 seconds, Fetched 1 row(s)

注意:如果您的系统中有 HDFS,您需要将 jar 文件复制到 HDFS 并使用该 HDFS 路径而不是像上面那样的本地路径创建函数。