Hive 交叉连接在本地地图连接上失败
Hive cross join fails on local map join
是否有直接的方法来解决以下错误或整体上使用 Hive 获得我需要的连接的更好方法?输出到存储的 table 不是必需的,因为我可以满足于 INSERT OVERWRITE LOCAL DIRECTORY
到 csv.
我正在尝试执行以下交叉连接。 ipint 是 9GB table,geoiplite 是 270MB。
CREATE TABLE iplatlong_sample AS
SELECT ipintegers.networkinteger, geoiplite.latitude, geoiplite.longitude
FROM geoiplite
CROSS JOIN ipintegers
WHERE ipintegers.networkinteger >= geoiplite.network_start_integer AND ipintegers.networkinteger <= geoiplite.network_last_integer;
我在 ipintegers 上使用 CROSS JOIN 而不是 geoiplite,因为我读到规则是较小的 table 在左边,较大的在右边。
根据 HIVE,Map 和 Reduce 阶段已完成 100%,但随后
2015-08-01 04:45:36,947 Stage-1 map = 100%, reduce = 100%, Cumulative
CPU 8767.09 sec
MapReduce Total cumulative CPU time: 0 days 2 hours 26
minutes 7 seconds 90 msec
Ended Job = job_201508010407_0001
Stage-8 is selected by condition resolver.
Execution log at: /tmp/myuser/.log
2015-08-01 04:45:38 Starting to launch local task to process map
join; maximum memory = 12221153280
Execution failed with exit status: 3
Obtaining error information
Task failed!
Task ID: Stage-8
Logs:
/tmp/myuser/hive.log
FAILED: Execution Error, return code 3 from
org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
MapReduce Jobs
Launched: Job 0: Map: 38 Reduce: 1 Cumulative CPU: 8767.09 sec
HDFS Read: 9438495086 HDFS Write: 8575548486 SUCCESS
我的蜂巢配置:
SET hive.mapred.local.mem=40960;
SET hive.exec.parallel=true;
SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate = true;
SET hive.optimize.skewjoin = true;
SET mapred.compress.map.output=true;
SET hive.stats.autogather=false;
我在 true 和 false 之间改变 SET hive.auto.convert.join
但结果相同。
这是 /tmp/myuser/hive.log
输出日志中的错误
$ tail -12 -f tmp/mysyer/hive.log
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Execution failed with exit status: 3
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Obtaining error information
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) -
Task failed!
Task ID:
Stage-8
Logs:
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) - /tmp/myuser/hive.log
2015-08-01 07:30:46,087 ERROR mr.MapredLocalTask (MapredLocalTask.java:execute(268)) - Execution failed with exit status: 3
2015-08-01 07:30:46,094 ERROR ql.Driver (SessionState.java:printError(419)) - FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
我是运行 Master 上的hive client,一个Google Cloud Platform instance of type n1-highmem-8 type (8 CPU, 52GB) and workers 是n1 -highmem-4 (4CPU 26GB),但我怀疑在 MAP 和 REDUCE 之后本地连接(隐含的)发生在 Master 上。无论如何,在 bdutils 中,我将工作节点 (n1-highmem-4
) 的 JAVAOPTS 配置为:n1-highmem-4
解决方案编辑:解决方案是将范围数据组织到范围树中。
我认为不可能执行这种交叉连接蛮力 - 只需乘以行号,这有点失控。你需要一些优化,我认为 hive 还没有能力。
但是这个问题真的可以在 O(N1+N2) 时间内解决,前提是您对数据进行了排序(hive 可以为您做)——您只需同时浏览两个列表,在每一步获取一个 ip整数,查看是否有任何间隔从这个整数开始,添加它们,删除那些结束的,发出匹配的元组,等等。伪代码:
intervals=[]
ipintegers = iterator(ipintegers_sorted_file)
intervals = iterator(intervals_sorted_on_start_file)
for x in ipintegers:
intervals = [i for i in intervals if i.end >= x]
while(intervals.current.start<=x):
intervals.append(intervals.current)
intervals.next()
for i in intervals:
output_match(i, x)
现在,如果你有一个外部 script/UDF 函数知道如何读取较小的 table 并获取 ip 整数作为输入并吐出匹配的元组作为输出,你可以使用 hive 和 SELECT TRANSFORM
将输入流式传输到它。
或者您可以 运行 在具有两个输入文件的本地计算机上使用此算法,因为这只是 O(N),甚至 9 gb 的数据也是可行的。
是否有直接的方法来解决以下错误或整体上使用 Hive 获得我需要的连接的更好方法?输出到存储的 table 不是必需的,因为我可以满足于 INSERT OVERWRITE LOCAL DIRECTORY
到 csv.
我正在尝试执行以下交叉连接。 ipint 是 9GB table,geoiplite 是 270MB。
CREATE TABLE iplatlong_sample AS
SELECT ipintegers.networkinteger, geoiplite.latitude, geoiplite.longitude
FROM geoiplite
CROSS JOIN ipintegers
WHERE ipintegers.networkinteger >= geoiplite.network_start_integer AND ipintegers.networkinteger <= geoiplite.network_last_integer;
我在 ipintegers 上使用 CROSS JOIN 而不是 geoiplite,因为我读到规则是较小的 table 在左边,较大的在右边。
根据 HIVE,Map 和 Reduce 阶段已完成 100%,但随后
2015-08-01 04:45:36,947 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 8767.09 sec
MapReduce Total cumulative CPU time: 0 days 2 hours 26 minutes 7 seconds 90 msec
Ended Job = job_201508010407_0001
Stage-8 is selected by condition resolver.
Execution log at: /tmp/myuser/.log
2015-08-01 04:45:38 Starting to launch local task to process map join; maximum memory = 12221153280
Execution failed with exit status: 3
Obtaining error information
Task failed!
Task ID: Stage-8
Logs:
/tmp/myuser/hive.log
FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
MapReduce Jobs Launched: Job 0: Map: 38 Reduce: 1 Cumulative CPU: 8767.09 sec
HDFS Read: 9438495086 HDFS Write: 8575548486 SUCCESS
我的蜂巢配置:
SET hive.mapred.local.mem=40960;
SET hive.exec.parallel=true;
SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate = true;
SET hive.optimize.skewjoin = true;
SET mapred.compress.map.output=true;
SET hive.stats.autogather=false;
我在 true 和 false 之间改变 SET hive.auto.convert.join
但结果相同。
这是 /tmp/myuser/hive.log
输出日志中的错误$ tail -12 -f tmp/mysyer/hive.log
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Execution failed with exit status: 3
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Obtaining error information
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) -
Task failed!
Task ID:
Stage-8
Logs:
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) - /tmp/myuser/hive.log
2015-08-01 07:30:46,087 ERROR mr.MapredLocalTask (MapredLocalTask.java:execute(268)) - Execution failed with exit status: 3
2015-08-01 07:30:46,094 ERROR ql.Driver (SessionState.java:printError(419)) - FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
我是运行 Master 上的hive client,一个Google Cloud Platform instance of type n1-highmem-8 type (8 CPU, 52GB) and workers 是n1 -highmem-4 (4CPU 26GB),但我怀疑在 MAP 和 REDUCE 之后本地连接(隐含的)发生在 Master 上。无论如何,在 bdutils 中,我将工作节点 (n1-highmem-4
) 的 JAVAOPTS 配置为:n1-highmem-4
解决方案编辑:解决方案是将范围数据组织到范围树中。
我认为不可能执行这种交叉连接蛮力 - 只需乘以行号,这有点失控。你需要一些优化,我认为 hive 还没有能力。
但是这个问题真的可以在 O(N1+N2) 时间内解决,前提是您对数据进行了排序(hive 可以为您做)——您只需同时浏览两个列表,在每一步获取一个 ip整数,查看是否有任何间隔从这个整数开始,添加它们,删除那些结束的,发出匹配的元组,等等。伪代码:
intervals=[]
ipintegers = iterator(ipintegers_sorted_file)
intervals = iterator(intervals_sorted_on_start_file)
for x in ipintegers:
intervals = [i for i in intervals if i.end >= x]
while(intervals.current.start<=x):
intervals.append(intervals.current)
intervals.next()
for i in intervals:
output_match(i, x)
现在,如果你有一个外部 script/UDF 函数知道如何读取较小的 table 并获取 ip 整数作为输入并吐出匹配的元组作为输出,你可以使用 hive 和 SELECT TRANSFORM
将输入流式传输到它。
或者您可以 运行 在具有两个输入文件的本地计算机上使用此算法,因为这只是 O(N),甚至 9 gb 的数据也是可行的。