可以在纯 Spark SQL 中使用 Spark Pandas UDF 吗?
Possible to use Spark Pandas UDF in pure Spark SQL?
这个有效:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
import pandas as pd
spark = SparkSession.builder.getOrCreate()
@pandas_udf(returnType="long")
def add_one(v: pd.Series) -> pd.Series:
return v.add(1)
spark.udf.register("add_one", add_one)
spark.sql("select add_one(1)").show()
不过,我想知道 if/how 我可以做以下工作:
$ spark-sql -e 'select add_one(1)'
现在,如果可以使用它,那就太好了。
恐怕这目前是不可能的。有趣的是,实际上没有人提到它。
其实在apache spark文档中的小记中“隐藏”的信息:
Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.
正如您可能理解的含义,这意味着您不能从 CLI spark-sql
调用 UDFs
。这是文档的 link。
可以仔细检查 bin/spark-sql
source code at github 实际做了什么:
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "[=10=]")"/find-spark-home
fi
export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
这再次确认,因为它提交给 thriftserver
,您不能在 spark-sql
CLI 中使用 UDF
。
Pandas UDF 是向量化的 UDF,旨在避免 PySpark 中的逐行迭代。注册这些 UDF 后,它们的行为就像 PySpark 函数 API。他们将 运行 居住在 Python 名工人体内。
如@tukan 所述,Spark SQL CLI 无法与 JDBC 服务器通信。所以,Spark 本身并不支持这个。
但是,您可以进行自定义 RPC 调用以直接调用它,但这并不像您最初想要做的那样简单或相同。
目前无法以您想要的方式使用 Python UDF。但是该选项可用于 Scala/Java UDF,因此如果您愿意使用 Scala/Java,这是一种方法。注意:我将 HiveUDF 实现为 Spark supports HiveUDF.
您需要做的第一件事是使用以下示例结构创建一个 Java 项目:
root
| - pom.xml
| - src/main/com/test/udf/SimpleConcatUDF.java
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test.udf</groupId>
<artifactId>simple-concat-udf</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<hive.version>3.1.2</hive.version>
</properties>
<repositories>
<repository>
<id>hortonworks</id>
<url>http://repo.hortonworks.com/content/groups/public</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<useProjectReferences>false</useProjectReferences>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.test.udf.SimpleConcatUDF</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
SimpleConcatUDF.java
package com.test.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class SimpleConcatUDF extends UDF {
public String evaluate(final Text text) {
return text.toString() + "_from_udf";
}
}
您接下来要做的是编译和打包它。我正在使用 maven 所以标准命令是:
cd <project-root-path>/
mvn clean install
# output jar file is located at <project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar
最后,您需要使用 create function
进行注册。如果您将该功能注册为永久功能,则只需 一次 即可。否则,您可以将其注册为临时。
spark-sql> create function simple_concat AS 'com.test.udf.SimpleConcatUDF' using jar '<project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar';
spark-sql> show user functions;
default.simple_concat
Time taken: 1.868 seconds, Fetched 1 row(s)
spark-sql> select simple_concat('a');
a_from_udf
Time taken: 0.079 seconds, Fetched 1 row(s)
注意:如果您的系统中有 HDFS,您需要将 jar 文件复制到 HDFS 并使用该 HDFS 路径而不是像上面那样的本地路径创建函数。
这个有效:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
import pandas as pd
spark = SparkSession.builder.getOrCreate()
@pandas_udf(returnType="long")
def add_one(v: pd.Series) -> pd.Series:
return v.add(1)
spark.udf.register("add_one", add_one)
spark.sql("select add_one(1)").show()
不过,我想知道 if/how 我可以做以下工作:
$ spark-sql -e 'select add_one(1)'
现在,如果可以使用它,那就太好了。
恐怕这目前是不可能的。有趣的是,实际上没有人提到它。
其实在apache spark文档中的小记中“隐藏”的信息:
Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.
正如您可能理解的含义,这意味着您不能从 CLI spark-sql
调用 UDFs
。这是文档的 link。
可以仔细检查 bin/spark-sql
source code at github 实际做了什么:
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "[=10=]")"/find-spark-home
fi
export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
这再次确认,因为它提交给 thriftserver
,您不能在 spark-sql
CLI 中使用 UDF
。
Pandas UDF 是向量化的 UDF,旨在避免 PySpark 中的逐行迭代。注册这些 UDF 后,它们的行为就像 PySpark 函数 API。他们将 运行 居住在 Python 名工人体内。
如@tukan 所述,Spark SQL CLI 无法与 JDBC 服务器通信。所以,Spark 本身并不支持这个。
但是,您可以进行自定义 RPC 调用以直接调用它,但这并不像您最初想要做的那样简单或相同。
目前无法以您想要的方式使用 Python UDF。但是该选项可用于 Scala/Java UDF,因此如果您愿意使用 Scala/Java,这是一种方法。注意:我将 HiveUDF 实现为 Spark supports HiveUDF.
您需要做的第一件事是使用以下示例结构创建一个 Java 项目:
root
| - pom.xml
| - src/main/com/test/udf/SimpleConcatUDF.java
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test.udf</groupId>
<artifactId>simple-concat-udf</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<hive.version>3.1.2</hive.version>
</properties>
<repositories>
<repository>
<id>hortonworks</id>
<url>http://repo.hortonworks.com/content/groups/public</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<useProjectReferences>false</useProjectReferences>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.test.udf.SimpleConcatUDF</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
SimpleConcatUDF.java
package com.test.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class SimpleConcatUDF extends UDF {
public String evaluate(final Text text) {
return text.toString() + "_from_udf";
}
}
您接下来要做的是编译和打包它。我正在使用 maven 所以标准命令是:
cd <project-root-path>/
mvn clean install
# output jar file is located at <project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar
最后,您需要使用 create function
进行注册。如果您将该功能注册为永久功能,则只需 一次 即可。否则,您可以将其注册为临时。
spark-sql> create function simple_concat AS 'com.test.udf.SimpleConcatUDF' using jar '<project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar';
spark-sql> show user functions;
default.simple_concat
Time taken: 1.868 seconds, Fetched 1 row(s)
spark-sql> select simple_concat('a');
a_from_udf
Time taken: 0.079 seconds, Fetched 1 row(s)
注意:如果您的系统中有 HDFS,您需要将 jar 文件复制到 HDFS 并使用该 HDFS 路径而不是像上面那样的本地路径创建函数。