Riak mapReduce 因超过 15 条记录而失败
Riak mapReduce fails with > 15 records
问题
我一直在学习 Riak,运行 遇到了 mapReduce 的问题。当有 15 条记录时,我的 mapReduce 函数工作正常,但在那之后,它会抛出堆栈跟踪错误。我是 Riak 和 Erlang 的新手,所以我不确定这是我的代码还是 Riak。任何有关如何调试此问题或问题的建议!
代码
地图
-module(identity_map).
-export([identity/3]).
identity(Values, _, _) ->
JsonData = riak_object:get_values(Values),
{struct, Objects} = mochijson2:decode(JsonData),
[Objects].
减少
-module(stocks_summary).
-export([average_high/2]).
% Returns values from a map phase
average_high(Values, _) ->
Total = lists:foldl(
fun(Record, Accum) ->
High = proplists:get_value(<<"high_d">>, Record),
Accum + High
end,
0,
Values),
[Total / length(Values)].
调用
curl -XPOST http://192.168.0.126:8098/mapred \
-H 'Content-Type: application/json' \
-d '{"inputs": ["stocks","goog"], "query": [{"map":{"language":"erlang","module":"identity_map","function":"identity"}}, {"reduce":{"language":"erlang","module":"stocks_summary","function":"average_high"}} ]}'
数据
Date,Open,High,Low,Close,Volume,Adj Close
2010-05-05,500.98,515.72,500.47,509.76,4566900,509.76
2010-05-04,526.52,526.74,504.21,506.37,6076300,506.37
2010-05-03,526.50,532.92,525.08,530.60,1857800,530.60
2010-04-30,531.13,537.68,525.44,525.70,2435400,525.70
2010-04-29,533.37,536.50,526.67,532.00,3058900,532.00
2010-04-28,532.10,534.83,521.03,529.19,3406100,529.19
2010-04-27,528.95,538.33,527.23,529.06,3844700,529.06
2010-04-26,544.97,544.99,529.21,531.64,4368800,531.64
2010-04-23,547.25,549.32,542.27,544.99,2089400,544.99
2010-04-22,552.00,552.50,543.35,547.06,3280700,547.06
2010-04-21,556.46,560.25,552.16,554.30,2391500,554.30
2010-04-20,554.17,559.66,551.06,555.04,2977400,555.04
2010-04-19,548.75,553.99,545.00,550.10,3894000,550.10
2010-04-16,563.00,568.81,549.63,550.15,12235500,550.15
2010-04-15,592.17,597.84,588.29,595.30,6716700,595.30
2010-04-14,590.06,592.34,584.01,589.00,3402700,589.00
2010-04-13,572.53,588.88,571.13,586.77,3845200,586.77
2010-04-12,567.35,574.00,566.22,572.73,2352400,572.73
2010-04-09,567.49,568.77,564.00,566.22,2056600,566.22
2010-04-08,563.32,569.85,560.05,567.49,1947500,567.49
2010-04-07,567.30,568.75,561.86,563.54,2581000,563.54
样本值
{
"date_s": "2010-04-07",
"open_d": 567.3,
"high_d": 568.75,
"low_d": 561.86,
"close_d": 563.54,
"volume_i": 2581000,
"adjClose_d": 563.54
}
堆栈跟踪
{
"phase": 1,
"error": "{function_clause,[{proplists,get_value,[<<\"high_d\">>,555.621,undefined],[{file,\"proplists.erl\"},{line,225}]},{stocks_summary,'-average_high/2-fun-0-',2,[{file,\"stocks_summary.erl\"},{line,9}]},{lists,foldl,3,[{file,\"lists.erl\"},{line,1248}]},{stocks_summary,average_high,2,[{file,\"stocks_summary.erl\"},{line,7}]},{riak_kv_w_reduce,reduce,3,[{file,\"src/riak_kv_w_reduce.erl\"},{line,207}]},{riak_kv_w_reduce,done,1,[{file,\"src/riak_kv_w_reduce.erl\"},{line,170}]},{riak_pipe_vnode_worker,wait_for_input,...},...]}",
"input": null,
"type": null,
"stack": null
}
假设
从堆栈跟踪来看,reduce 似乎被应用于一个值而不是元组列表,但是导致这个问题的原因是 运行ge 是因为当我只放置 10-15 条记录时放入桶中,效果很好。
问题出在减少阶段。 map 阶段分布在集群周围并由许多 vnode 执行,这些 vnode 将 map 阶段结果转发到将 运行 减少 reduce 的节点。由于这些预计不会同时到达,reduce 阶段函数可能 运行 多次,其在后续 运行s 上的输入是先前 reduce 结果和新到达的 map 阶段结果的串联。
这意味着在第二个 运行 上,您的 reduce 函数接收前一个 avarage 作为普通数字作为列表中的第一个元素,列表的其余部分是 json objects/proplists你所期望的。
要解决此问题,请让您的 reduce 函数 return 包含当前平均值和到目前为止所见值数量的 proplist。下面是一种可能性,但此示例将 return 您的 MapReduce 最终结果作为 object/proplist 而不是数字。
average_high(Values, _) ->
{Count,Total} = lists:foldl(
fun(Record, {Cnt,Tot}) ->
case proplists:get_value(<<"average">>,Record,undefined) of
undefined ->
High = proplists:get_value(<<"high_d">>, Record),
{Cnt+1,Tot + High};
Ave ->
C = proplists:get_value(<<"count">>, Record, 1),
{Cnt + C, Tot + (Ave * C)}
end
end,
{0,0},
Values),
[[{<<"average">>,Total/Count},{<<"count">>,Count}]].
A reduce function should produce a list of values, but it must also be
true that the function is commutative, associative, and idempotent.
That is, if the input list [a,b,c,d] is valid for a given F, then all
of the following must produce the same result:
F([a,b,c,d])
F([a,d] ++ F([c,b]))
F([F([a]),F([c]),F([b]),F([d])])
问题
我一直在学习 Riak,运行 遇到了 mapReduce 的问题。当有 15 条记录时,我的 mapReduce 函数工作正常,但在那之后,它会抛出堆栈跟踪错误。我是 Riak 和 Erlang 的新手,所以我不确定这是我的代码还是 Riak。任何有关如何调试此问题或问题的建议!
代码
地图
-module(identity_map).
-export([identity/3]).
identity(Values, _, _) ->
JsonData = riak_object:get_values(Values),
{struct, Objects} = mochijson2:decode(JsonData),
[Objects].
减少
-module(stocks_summary).
-export([average_high/2]).
% Returns values from a map phase
average_high(Values, _) ->
Total = lists:foldl(
fun(Record, Accum) ->
High = proplists:get_value(<<"high_d">>, Record),
Accum + High
end,
0,
Values),
[Total / length(Values)].
调用
curl -XPOST http://192.168.0.126:8098/mapred \
-H 'Content-Type: application/json' \
-d '{"inputs": ["stocks","goog"], "query": [{"map":{"language":"erlang","module":"identity_map","function":"identity"}}, {"reduce":{"language":"erlang","module":"stocks_summary","function":"average_high"}} ]}'
数据
Date,Open,High,Low,Close,Volume,Adj Close
2010-05-05,500.98,515.72,500.47,509.76,4566900,509.76
2010-05-04,526.52,526.74,504.21,506.37,6076300,506.37
2010-05-03,526.50,532.92,525.08,530.60,1857800,530.60
2010-04-30,531.13,537.68,525.44,525.70,2435400,525.70
2010-04-29,533.37,536.50,526.67,532.00,3058900,532.00
2010-04-28,532.10,534.83,521.03,529.19,3406100,529.19
2010-04-27,528.95,538.33,527.23,529.06,3844700,529.06
2010-04-26,544.97,544.99,529.21,531.64,4368800,531.64
2010-04-23,547.25,549.32,542.27,544.99,2089400,544.99
2010-04-22,552.00,552.50,543.35,547.06,3280700,547.06
2010-04-21,556.46,560.25,552.16,554.30,2391500,554.30
2010-04-20,554.17,559.66,551.06,555.04,2977400,555.04
2010-04-19,548.75,553.99,545.00,550.10,3894000,550.10
2010-04-16,563.00,568.81,549.63,550.15,12235500,550.15
2010-04-15,592.17,597.84,588.29,595.30,6716700,595.30
2010-04-14,590.06,592.34,584.01,589.00,3402700,589.00
2010-04-13,572.53,588.88,571.13,586.77,3845200,586.77
2010-04-12,567.35,574.00,566.22,572.73,2352400,572.73
2010-04-09,567.49,568.77,564.00,566.22,2056600,566.22
2010-04-08,563.32,569.85,560.05,567.49,1947500,567.49
2010-04-07,567.30,568.75,561.86,563.54,2581000,563.54
样本值
{
"date_s": "2010-04-07",
"open_d": 567.3,
"high_d": 568.75,
"low_d": 561.86,
"close_d": 563.54,
"volume_i": 2581000,
"adjClose_d": 563.54
}
堆栈跟踪
{
"phase": 1,
"error": "{function_clause,[{proplists,get_value,[<<\"high_d\">>,555.621,undefined],[{file,\"proplists.erl\"},{line,225}]},{stocks_summary,'-average_high/2-fun-0-',2,[{file,\"stocks_summary.erl\"},{line,9}]},{lists,foldl,3,[{file,\"lists.erl\"},{line,1248}]},{stocks_summary,average_high,2,[{file,\"stocks_summary.erl\"},{line,7}]},{riak_kv_w_reduce,reduce,3,[{file,\"src/riak_kv_w_reduce.erl\"},{line,207}]},{riak_kv_w_reduce,done,1,[{file,\"src/riak_kv_w_reduce.erl\"},{line,170}]},{riak_pipe_vnode_worker,wait_for_input,...},...]}",
"input": null,
"type": null,
"stack": null
}
假设
从堆栈跟踪来看,reduce 似乎被应用于一个值而不是元组列表,但是导致这个问题的原因是 运行ge 是因为当我只放置 10-15 条记录时放入桶中,效果很好。
问题出在减少阶段。 map 阶段分布在集群周围并由许多 vnode 执行,这些 vnode 将 map 阶段结果转发到将 运行 减少 reduce 的节点。由于这些预计不会同时到达,reduce 阶段函数可能 运行 多次,其在后续 运行s 上的输入是先前 reduce 结果和新到达的 map 阶段结果的串联。
这意味着在第二个 运行 上,您的 reduce 函数接收前一个 avarage 作为普通数字作为列表中的第一个元素,列表的其余部分是 json objects/proplists你所期望的。
要解决此问题,请让您的 reduce 函数 return 包含当前平均值和到目前为止所见值数量的 proplist。下面是一种可能性,但此示例将 return 您的 MapReduce 最终结果作为 object/proplist 而不是数字。
average_high(Values, _) ->
{Count,Total} = lists:foldl(
fun(Record, {Cnt,Tot}) ->
case proplists:get_value(<<"average">>,Record,undefined) of
undefined ->
High = proplists:get_value(<<"high_d">>, Record),
{Cnt+1,Tot + High};
Ave ->
C = proplists:get_value(<<"count">>, Record, 1),
{Cnt + C, Tot + (Ave * C)}
end
end,
{0,0},
Values),
[[{<<"average">>,Total/Count},{<<"count">>,Count}]].
A reduce function should produce a list of values, but it must also be true that the function is commutative, associative, and idempotent. That is, if the input list [a,b,c,d] is valid for a given F, then all of the following must produce the same result:
F([a,b,c,d]) F([a,d] ++ F([c,b])) F([F([a]),F([c]),F([b]),F([d])])