Java 中的 Hive UDF 在创建 table 时失败
Hive UDF in Java fails when creating a table
这两个查询有什么区别:
SELECT my_fun(col_name) FROM my_table;
和
CREATE TABLE new_table AS SELECT my_fun(col_name) FROM my_table;
其中 my_fun 是一个 java UDF。
我在问,因为当我创建新的 table(第二个查询)时,我收到一个 java 错误。
Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: Map operator initialization failed
...
Caused by: org.apache.hadoop.hive.ql.exec.UDFArgumentException: Unable to instantiate UDF implementation class com.company_name.examples.ExampleUDF: java.lang.NullPointerException
我发现错误来源是我的 java 文件中的行:
encoded = Files.readAllBytes(Paths.get(configPath));
但问题是为什么它在未创建 table 时有效,而在创建 table 时失败?
问题可能出在您读取文件的方式上。尝试将文件路径作为UDF中的第二个参数传递,然后读取如下
private BufferedReader getReaderFor(String filePath) throws HiveException {
try {
Path fullFilePath = FileSystems.getDefault().getPath(filePath);
Path fileName = fullFilePath.getFileName();
if (Files.exists(fileName)) {
return Files.newBufferedReader(fileName, Charset.defaultCharset());
}
else
if (Files.exists(fullFilePath)) {
return Files.newBufferedReader(fullFilePath, Charset.defaultCharset());
}
else {
throw new HiveException("Could not find \"" + fileName + "\" or \"" + fullFilePath + "\" in inersect_file() UDF.");
}
}
catch(IOException exception) {
throw new HiveException(exception);
}
}
private void loadFromFile(String filePath) throws HiveException {
set = new HashSet<String>();
try (BufferedReader reader = getReaderFor(filePath)) {
String line;
while((line = reader.readLine()) != null) {
set.add(line);
}
} catch (IOException e) {
throw new HiveException(e);
}
}
可以找到使用文件 reader 的不同通用 UDF 的完整代码 here
我觉得有几点不清楚,所以这个答案是基于假设的。
首先,重要的是要了解 Hive 当前优化了几个简单的查询,并且根据您的数据大小,适合您的查询 SELECT my_fun(col_name) FROM my_table;
最有可能 运行从您正在执行作业的客户端本地,这就是为什么您的 UDF 可以访问本地可用的配置文件,这种“执行模式”是因为您的数据大小。 CTAS 触发独立于输入数据的作业,该作业在集群中分布式运行,每个工作人员都无法访问您的配置文件。
看起来您正在尝试从本地文件系统读取配置文件,而不是从 HDSFS Files.readAllBytes(Paths.get(configPath))
,这意味着您的配置必须在所有工作节点中复制或添加以前到分布式缓存(你可以使用从这里添加文件,文档 here. You can find another questions 关于从 UDF 访问分布式缓存中的文件。
另一个问题是您通过环境变量传递配置文件的位置,该环境变量不会作为配置单元作业的一部分传播到工作节点。您应该将此配置作为配置单元配置传递,假设您正在扩展 GenericUDF,则有一个从 UDF here 访问 Hive Config 的答案。
这两个查询有什么区别:
SELECT my_fun(col_name) FROM my_table;
和
CREATE TABLE new_table AS SELECT my_fun(col_name) FROM my_table;
其中 my_fun 是一个 java UDF。
我在问,因为当我创建新的 table(第二个查询)时,我收到一个 java 错误。
Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: Map operator initialization failed
...
Caused by: org.apache.hadoop.hive.ql.exec.UDFArgumentException: Unable to instantiate UDF implementation class com.company_name.examples.ExampleUDF: java.lang.NullPointerException
我发现错误来源是我的 java 文件中的行:
encoded = Files.readAllBytes(Paths.get(configPath));
但问题是为什么它在未创建 table 时有效,而在创建 table 时失败?
问题可能出在您读取文件的方式上。尝试将文件路径作为UDF中的第二个参数传递,然后读取如下
private BufferedReader getReaderFor(String filePath) throws HiveException {
try {
Path fullFilePath = FileSystems.getDefault().getPath(filePath);
Path fileName = fullFilePath.getFileName();
if (Files.exists(fileName)) {
return Files.newBufferedReader(fileName, Charset.defaultCharset());
}
else
if (Files.exists(fullFilePath)) {
return Files.newBufferedReader(fullFilePath, Charset.defaultCharset());
}
else {
throw new HiveException("Could not find \"" + fileName + "\" or \"" + fullFilePath + "\" in inersect_file() UDF.");
}
}
catch(IOException exception) {
throw new HiveException(exception);
}
}
private void loadFromFile(String filePath) throws HiveException {
set = new HashSet<String>();
try (BufferedReader reader = getReaderFor(filePath)) {
String line;
while((line = reader.readLine()) != null) {
set.add(line);
}
} catch (IOException e) {
throw new HiveException(e);
}
}
可以找到使用文件 reader 的不同通用 UDF 的完整代码 here
我觉得有几点不清楚,所以这个答案是基于假设的。
首先,重要的是要了解 Hive 当前优化了几个简单的查询,并且根据您的数据大小,适合您的查询 SELECT my_fun(col_name) FROM my_table;
最有可能 运行从您正在执行作业的客户端本地,这就是为什么您的 UDF 可以访问本地可用的配置文件,这种“执行模式”是因为您的数据大小。 CTAS 触发独立于输入数据的作业,该作业在集群中分布式运行,每个工作人员都无法访问您的配置文件。
看起来您正在尝试从本地文件系统读取配置文件,而不是从 HDSFS Files.readAllBytes(Paths.get(configPath))
,这意味着您的配置必须在所有工作节点中复制或添加以前到分布式缓存(你可以使用从这里添加文件,文档 here. You can find another questions
另一个问题是您通过环境变量传递配置文件的位置,该环境变量不会作为配置单元作业的一部分传播到工作节点。您应该将此配置作为配置单元配置传递,假设您正在扩展 GenericUDF,则有一个从 UDF here 访问 Hive Config 的答案。