PyFlink - 在 JAR 中使用 Scala UDF 的问题
PyFlink - Issue using Scala UDF in JAR
我正在尝试使用外部 JAR 在 Pyflink 中注册 Scala UDF,如下所示,但出现以下错误。
Scala UDF:
package com.dummy
import org.apache.flink.table.functions.ScalarFunction
class dummyTransform(factor: Int) extends ScalarFunction {
def eval(s: String): Int = {
s.hashCode()
}
}
build.sbt:
name := "hello_scala_for_flink"
version := "0.1"
scalaVersion := "2.12.11"
libraryDependencies += "org.apache.flink" % "flink-table-common" % "1.11.2" % "provided"
assembly.sbt:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
Scala 项目结构:
hello_scala_for_flink/
project/
src/
main/
resources/
scala/
com.dummy/
dummyTransform
hello_scala
test
target/
build.sbt
shell:
cd hello_scala_for_flink
sbt assembly
cp ./target/scala-2.12/hello_scala_for_flink-assembly-0.1.jar /Users/py-r/opt/anaconda3/envs/venv_pyflink_37/lib/python3.7/site-packages/pyflink/lib/
Python:
from pyflink.dataset import ExecutionEnvironment
exec_env = ExecutionEnvironment.get_execution_environment()
#exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
table_env = BatchTableEnvironment.create(exec_env, t_config)
table_env.register_java_function("hash_code","com.dummy.dummyTransform")
错误(包括重启 Anaconda 之后):
Py4JJavaError: An error occurred while calling o12.newInstance.
: java.lang.InstantiationException: com.dummy.dummyTransform
at java.base/java.lang.Class.newInstance(Class.java:598)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NoSuchMethodException: com.dummy.dummyTransform.<init>()
at java.base/java.lang.Class.getConstructor0(Class.java:3427)
at java.base/java.lang.Class.newInstance(Class.java:585)
11 more
版本:
jdk = 1.8.0_151.jdk
scala = 2.12.11
python = 3.7
apache-beam = 2.19.0
apache-flink = 1.11.2
知道问题出在哪里吗?
感谢您的支持
看来我自己发现了问题。显然上面的代码只需要 class 实例化:
class dummyTransform(factor: Int) extends ScalarFunction {
def eval(s: String): Int = {
s.hashCode() * factor
}
def this() = this(1)
}
此外,由于另一个错误,我更改为 Scala 2.11.12。现在一切似乎都正常了:太棒了!
我正在尝试使用外部 JAR 在 Pyflink 中注册 Scala UDF,如下所示,但出现以下错误。
Scala UDF:
package com.dummy
import org.apache.flink.table.functions.ScalarFunction
class dummyTransform(factor: Int) extends ScalarFunction {
def eval(s: String): Int = {
s.hashCode()
}
}
build.sbt:
name := "hello_scala_for_flink"
version := "0.1"
scalaVersion := "2.12.11"
libraryDependencies += "org.apache.flink" % "flink-table-common" % "1.11.2" % "provided"
assembly.sbt:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
Scala 项目结构:
hello_scala_for_flink/
project/
src/
main/
resources/
scala/
com.dummy/
dummyTransform
hello_scala
test
target/
build.sbt
shell:
cd hello_scala_for_flink
sbt assembly
cp ./target/scala-2.12/hello_scala_for_flink-assembly-0.1.jar /Users/py-r/opt/anaconda3/envs/venv_pyflink_37/lib/python3.7/site-packages/pyflink/lib/
Python:
from pyflink.dataset import ExecutionEnvironment
exec_env = ExecutionEnvironment.get_execution_environment()
#exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
table_env = BatchTableEnvironment.create(exec_env, t_config)
table_env.register_java_function("hash_code","com.dummy.dummyTransform")
错误(包括重启 Anaconda 之后):
Py4JJavaError: An error occurred while calling o12.newInstance.
: java.lang.InstantiationException: com.dummy.dummyTransform
at java.base/java.lang.Class.newInstance(Class.java:598)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NoSuchMethodException: com.dummy.dummyTransform.<init>()
at java.base/java.lang.Class.getConstructor0(Class.java:3427)
at java.base/java.lang.Class.newInstance(Class.java:585)
11 more
版本:
jdk = 1.8.0_151.jdk
scala = 2.12.11
python = 3.7
apache-beam = 2.19.0
apache-flink = 1.11.2
知道问题出在哪里吗?
感谢您的支持
看来我自己发现了问题。显然上面的代码只需要 class 实例化:
class dummyTransform(factor: Int) extends ScalarFunction {
def eval(s: String): Int = {
s.hashCode() * factor
}
def this() = this(1)
}
此外,由于另一个错误,我更改为 Scala 2.11.12。现在一切似乎都正常了:太棒了!