调用自定义 spark UDF 时如何解决 java 中的 NoSuchMethodException

How to resolve NoSuchMethodException in java when calling custom spark UDF

我有一个 java spark 流应用程序(使用 spark 3.0.1),我需要在其中调用流数据的自定义转换。这些自定义转换在 class 中定义为方法,并作为 jar 文件提供给我们。我们需要将这些方法包装在 UDF 中,并在我们的 spark 代码中调用它们。一组示例转换可以定义如下。请记住它是一个罐子。

import java.io.Serializable;

public class CustomTransformations implements Serializable {
    public String f1(String input) {
        return input + "_1";
    }

    public String f2(String input) {
        return input + "_2";
    }

    public String f3(String input) {
        return input + "_3";
    }
}

让我们假设在某个地方(例如 json 或配置文件)我们有一个转换映射和相应的方法名称(字符串),以便给定转换,我们可以将相应的方法包装在 UDF 中并调用它。为此,我创建了以下 class。

import java.lang.reflect.Method;

import static org.apache.spark.sql.functions.udf;

public class Creator {
    public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
            throws NoSuchMethodException {
        Method method = ct.getClass().getDeclaredMethod(funcName);
        return udf(
                (UDF1<String, Object>) method::invoke, DataTypes.StringType);
    }
}

到目前为止没有编译错误。但现在的问题是,如果我从 spark 代码中调用此方法 getUDF,它会显示 NoSuchMethodException。例如。我的火花代码如下所示。

public class SampleSparkJob {
    public static void main(String[] args) {
        SparkSession.Builder sparkSessionBuilder = SparkSession.builder()
                .master("local[2]")
                .appName("sample-streaming");

        CustomTransformations ct = new CustomTransformations();
        try (SparkSession spark = sparkSessionBuilder.getOrCreate()) {
            Dataset<Row> df1 = MyKafkaConnectors.readFromKafka();

            // this is where I get the exceptions
            Dataset<Row> df2 = df1
                    .withColumn("value", Creator.getUDF(ct, "f1").apply(col("value")))
                    .withColumn("value", Creator.getUDF(ct, "f2").apply(col("value")))
                    .withColumn("value", Creator.getUDF(ct, "f3").apply(col("value")));

            StreamingQuery query = MyKafkaConnectors.WriteToKafka(df2);
            query.awaitTermination();
        } catch (TimeoutException | StreamingQueryException | NoSuchMethodException e) {
            e.printStackTrace();
        }
    }
}

这是我得到的错误:

java.lang.NoSuchMethodException: <pkgname>.CustomTransformations.f1()
    at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
    at Creator.getUDF(Creator.java:14)
    at SampleSparkJob.main(SampleSparkJob.java:29)

包名正确。很明显,客户端的CustomTransformationsclass有一个方法f1。所以我无法理解为什么会显示此错误。感谢任何帮助。

您的代码有两个问题,都与 spark 无关。

首先,NoSuchMethodException:CustomTransformations.f1() 告诉您不存在不带任何参数 的方法f1 。这是真的。

您需要将参数类型指定为 getDeclaredMethod(在本例中为字符串)。

其次,一个方法不能用invoke单独调用,需要传递“owner”或“this”对象来调用。

然后 Creator 固定看起来像这样:

public class Creator {
    public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
            throws NoSuchMethodException {
        Method method = ct.getClass().getDeclaredMethod(funcName, String.class);
        return udf(
                (UDF1<String, Object>) (s -> method.invoke(ct, s)), DataTypes.StringType);
    }
}

它会工作得很好。

对于@fonkap 给出的准确答案,我只有一点要补充。由于 java.lang.reflect.Method 不可序列化,我们需要绕过在 getUDF 方法中引用该对象。 Creator class 将类似于以下内容。

import static org.apache.spark.sql.functions.udf;

public class Creator implements Serializable {
    public static UserDefinedFunction getUDF(CustomTransformation ct, String funcName) {
        return udf((UDF1<String, Object>) (s -> ct.getClass().getDeclaredMethod(funcName,
                String.class).invoke(ct, s)),
                DataTypes.StringType);
    }            
}