Pig UDF,分布式缓存中的文件在批处理过程中被删除
Pig UDF, file in Distributed Cache deleted during batch work
public class GetCountryFromIP extends EvalFunc<String> {
@Override
public List<String> getCacheFiles() {
List<String> list = new ArrayList<String>(1);
list.add("/input/pig/resources/GeoLite2-Country.mmdb#GeoLite2-Country");
return list;
}
@Override
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0 || input.get(0) == null) {
return null;
}
try {
String inputIP = (String) input.get(0);
String output;
File database = new File("./GeoLite2-Country");
//CODE FOR EXPLAIN
if (database.exists()) {
System.out.print("EXIST!!!");
} else {
System.out.print("NOTEXISTS!!!");
}
//CODE FOR EXPLAIN
DatabaseReader reader = new DatabaseReader.Builder(database).build();
InetAddress ipAddress = InetAddress.getByName(inputIP);
CountryResponse response = reader.country(ipAddress);
Country country = response.getCountry();
output = country.getIsoCode();
return output;
} catch (AddressNotFoundException e) {
return null;
} catch (Exception ee) {
throw new IOException("Uncaught exec" + ee);
}
}
}
这是我的 UDF 代码,我需要 GeoLite2-Count.mmdb 文件,所以使用 GetCacheFile。
我还把所有的 Pig-Latin 放到一个 pig 文件中,'batch.pig'
当我运行这个文件'pig batch.pig'
像这样输出接缝
2015-10-06 01:16:56,737 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - soft limit at 83886080
2015-10-06 01:16:56,737 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - bufstart = 0; bufvoid = 104857600
2015-10-06 01:16:56,737 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - kvstart = 26214396; length = 6553600
2015-10-06 01:16:56,738 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2015-10-06 01:16:56,744 [LocalJobRunner Map Task Executor #0] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2015-10-06 01:16:56,754 [LocalJobRunner Map Task Executor #0] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Map - Aliases being processed per job phase (AliasName[line,offset]): M: weblog[-1,-1],weblog_web[30,13],weblog_web[-1,-1],weblog_web[-1,-1],desktop_active_log_account_filter[7,36],desktop_parsed[3,18],desktop_parsed_abstract[5,26],weblog_web[-1,-1],web_active_log_account_filter[20,32],weblog_web_parsed[16,20],weblog_web_parsed_abstract[18,29] C: R:
EXIST!!!
2015-10-06 01:16:56,997 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.LocalJobRunner -
2015-10-06 01:16:56,997 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
...
...
...
2015-10-06 01:16:57,938 [Thread-1885] INFO org.apache.hadoop.mapred.LocalJobRunner - reduce task executor complete.
2015-10-06 01:16:57,939 [pool-59-thread-1] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
NOTEXIST!!!
2015-10-06 01:16:57,974 [pool-59-thread-1] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce - Aliases being processed per job phase (AliasName[line,offset]): M: account_hour_activity[42,24],account_hour_activity_group[41,30],team_hour_activity[76,21],team_hour_activity_group[75,27] C:
...
...
..
2015-10-06 01:16:57,976 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2015-10-06 01:16:57,977 [Thread-2139] INFO org.apache.hadoop.mapred.LocalJobRunner - map task executor complete.
2015-10-06 01:16:57,981 [Thread-2139] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local1209692101_0021
java.lang.Exception: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: Local Rearrange[tuple]{tuple}(true) - scope-2240 Operator Key: scope-2240): org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: weblog_web_parsed_abstract: New For Each(false,false,false)[bag] - scope-1379 Operator Key: scope-1379): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: com.tosslab.sprinklr.country.GetCountryFromIP [Uncaught execjava.io.FileNotFoundException: ./GeoLite2-Country (No such file or directory)]
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: Local Rearrange[tuple]{tuple}(true) - scope-2240 Operator Key: scope-2240): org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: weblog_web_parsed_abstract: New For Each(false,false,false)[bag] - scope-1379 Operator Key: scope-1379): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: com.tosslab.sprinklr.country.GetCountryFromIP [Uncaught execjava.io.FileNotFoundException: ./GeoLite2-Country (No such file or directory)]
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNextTuple(POLocalRearrange.java:291)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.runPipeline(POSplit.java:259)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.processPlan(POSplit.java:241)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.processPlan(POSplit.java:246)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNextTuple(POSplit.java:233)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:283)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:278)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: weblog_web_parsed_abstract: New For Each(false,false,false)[bag] - scope-1379 Operator Key: scope-1379): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: com.tosslab.sprinklr.country.GetCountryFromIP [Uncaught execjava.io.FileNotFoundException: ./GeoLite2-Country (No such file or directory)]
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:246)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:307)
... 17 more
这意味着 mmdb 文件在批处理过程中被删除...
这是怎么回事?我该如何解决这个问题?
作业似乎是 运行 来自本地模式。
2015-10-06 01:16:57,976 [**LocalJobRunner** Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
当您 运行 在本地模式下作业时,不支持分布式缓存。
2015-10-05 23:22:56,675 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Distributed cache not supported or needed in local mode.
将所有内容放在 HDFS 中,运行 采用 mapreduce 模式。
public class GetCountryFromIP extends EvalFunc<String> {
@Override
public List<String> getCacheFiles() {
List<String> list = new ArrayList<String>(1);
list.add("/input/pig/resources/GeoLite2-Country.mmdb#GeoLite2-Country");
return list;
}
@Override
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0 || input.get(0) == null) {
return null;
}
try {
String inputIP = (String) input.get(0);
String output;
File database = new File("./GeoLite2-Country");
//CODE FOR EXPLAIN
if (database.exists()) {
System.out.print("EXIST!!!");
} else {
System.out.print("NOTEXISTS!!!");
}
//CODE FOR EXPLAIN
DatabaseReader reader = new DatabaseReader.Builder(database).build();
InetAddress ipAddress = InetAddress.getByName(inputIP);
CountryResponse response = reader.country(ipAddress);
Country country = response.getCountry();
output = country.getIsoCode();
return output;
} catch (AddressNotFoundException e) {
return null;
} catch (Exception ee) {
throw new IOException("Uncaught exec" + ee);
}
}
}
这是我的 UDF 代码,我需要 GeoLite2-Count.mmdb 文件,所以使用 GetCacheFile。
我还把所有的 Pig-Latin 放到一个 pig 文件中,'batch.pig'
当我运行这个文件'pig batch.pig'
像这样输出接缝
2015-10-06 01:16:56,737 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - soft limit at 83886080
2015-10-06 01:16:56,737 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - bufstart = 0; bufvoid = 104857600
2015-10-06 01:16:56,737 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - kvstart = 26214396; length = 6553600
2015-10-06 01:16:56,738 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2015-10-06 01:16:56,744 [LocalJobRunner Map Task Executor #0] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2015-10-06 01:16:56,754 [LocalJobRunner Map Task Executor #0] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Map - Aliases being processed per job phase (AliasName[line,offset]): M: weblog[-1,-1],weblog_web[30,13],weblog_web[-1,-1],weblog_web[-1,-1],desktop_active_log_account_filter[7,36],desktop_parsed[3,18],desktop_parsed_abstract[5,26],weblog_web[-1,-1],web_active_log_account_filter[20,32],weblog_web_parsed[16,20],weblog_web_parsed_abstract[18,29] C: R:
EXIST!!!
2015-10-06 01:16:56,997 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.LocalJobRunner -
2015-10-06 01:16:56,997 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
...
...
...
2015-10-06 01:16:57,938 [Thread-1885] INFO org.apache.hadoop.mapred.LocalJobRunner - reduce task executor complete.
2015-10-06 01:16:57,939 [pool-59-thread-1] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
NOTEXIST!!!
2015-10-06 01:16:57,974 [pool-59-thread-1] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce - Aliases being processed per job phase (AliasName[line,offset]): M: account_hour_activity[42,24],account_hour_activity_group[41,30],team_hour_activity[76,21],team_hour_activity_group[75,27] C:
...
...
..
2015-10-06 01:16:57,976 [LocalJobRunner Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2015-10-06 01:16:57,977 [Thread-2139] INFO org.apache.hadoop.mapred.LocalJobRunner - map task executor complete.
2015-10-06 01:16:57,981 [Thread-2139] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local1209692101_0021
java.lang.Exception: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: Local Rearrange[tuple]{tuple}(true) - scope-2240 Operator Key: scope-2240): org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: weblog_web_parsed_abstract: New For Each(false,false,false)[bag] - scope-1379 Operator Key: scope-1379): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: com.tosslab.sprinklr.country.GetCountryFromIP [Uncaught execjava.io.FileNotFoundException: ./GeoLite2-Country (No such file or directory)]
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: Local Rearrange[tuple]{tuple}(true) - scope-2240 Operator Key: scope-2240): org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: weblog_web_parsed_abstract: New For Each(false,false,false)[bag] - scope-1379 Operator Key: scope-1379): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: com.tosslab.sprinklr.country.GetCountryFromIP [Uncaught execjava.io.FileNotFoundException: ./GeoLite2-Country (No such file or directory)]
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNextTuple(POLocalRearrange.java:291)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.runPipeline(POSplit.java:259)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.processPlan(POSplit.java:241)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.processPlan(POSplit.java:246)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNextTuple(POSplit.java:233)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:283)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:278)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: weblog_web_parsed_abstract: New For Each(false,false,false)[bag] - scope-1379 Operator Key: scope-1379): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: com.tosslab.sprinklr.country.GetCountryFromIP [Uncaught execjava.io.FileNotFoundException: ./GeoLite2-Country (No such file or directory)]
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:246)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:307)
... 17 more
这意味着 mmdb 文件在批处理过程中被删除...
这是怎么回事?我该如何解决这个问题?
作业似乎是 运行 来自本地模式。
2015-10-06 01:16:57,976 [**LocalJobRunner** Map Task Executor #0] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
当您 运行 在本地模式下作业时,不支持分布式缓存。
2015-10-05 23:22:56,675 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Distributed cache not supported or needed in local mode.
将所有内容放在 HDFS 中,运行 采用 mapreduce 模式。