Riak 的 erlang pb 客户端和地图获取
Riak's erlang pb client and map fetch
我正在尝试在 Riak 中存储寄存器和集合的映射,并使用 Riak 的 pb 客户端处理它们。我的目标是保存地图,然后从返回的对象中提取持久化的地图并对其进行处理(在本例中,从集合中提取寄存器和值并转换为 JSON)。似乎尝试使用 riakc_map:fetch 从返回的对象中提取值不是正确的方法??
这是我创建它的方式(键是一个 uuid,QP 是一个整数列表):
M = riak:new_map(),
M1 = riak:map_update({<<"post_id">>, register},
fun(R) -> riakc_register:set(Key, R) end, M),
M2 = riak:map_update({<<"userids">>, set},
fun(S) ->
[riakc_set:add_element(helpers:int_to_bin(Q), S) || Q <- QP]
end, M2),
[...]
然后我将它包装在一个对象中,并保存该对象。
Obj = riakc_obj:new(<<?POST_BUCKET>>, Key, Map) %% Map created via new_map()
{ok, Obj2} = riakc_pb_socket:put(Pid, <<?POST_BUCKET>>, Obj, [return_body])
现在这就是我 运行 遇到麻烦的地方:
Post = binary_to_term(riakc_obj:get_value(Obj2)));
Keys = riakc_map:fetch_keys(Post), %% returns []
OrigMap = riakc_map:value(Post), %% returns []
IsKey1 = riakc_map:is_key({<<"post_id">>, register}, Post), %% false
IsKey2 = riakc_map:is_key({<<"post_id">>}, Post), %% false
最重要的
PostId = riakc_map:fetch({<<"post_id">>, register}, Post),
死于
{function_clause, [{orddict,fetch, ...
Post 在 binary_to_term(riakc_obj:get_value(Obj2))); 之后,就我而言看起来是正确的可以说:
< Post = {map,[],
[{{<<"content">>,register},
{register,<<>>,<<"<post>Hello, World!\r\n</post>">>}},
{{<<"post_id">>,register},
{register,<<>>,<<"238e4300-a651-11e4-86c8-6003088f077a">>}},
{{<<"userids">>,set},
[{set,[],[<<"-1">>],[],undefined}]},
.....
非常感谢任何帮助!
有些地方不太对劲。此更新调用
M2 = riakc_map:update({<<"userids">>, set},
fun(S) ->
[riakc_set:add_element(helpers:int_to_bin(Q), S) || Q <- QP]
end, M1),
不会创建包含多个成员的集合,它会创建一个集合列表,每个集合都有一个成员。如果您尝试构建一组整数,请使用折叠而不是列表理解:
M2 = riakc_map:update({<<"userids">>, set},
fun(S) ->
lists:foldl(
fun(Q, Acc) ->
riakc_set:add_element(helpers:int_to_bin(Q), Acc)
end,
S, QP),
end, M1),
这实际上并没有更新地图的值,它在记录中分阶段更新:
#map{value = [],
updates = [{{<<"post_id">>,register},
{register,<<>>,<<"uuid">>}},
{{<<"userids">>,set},
{set,[],
[<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
<<"7">>,<<"8">>,<<"9">>],
[],undefined}}],
removes = [],context = undefined}
此时,riakc_map:value
returns []
因为那是记录中的值。要将分阶段更新应用到值,您需要调用 riakc_pb_socket:modify_type
,这也会将值存储在 Riak 中。
完整过程
准备 Riak 集群以使用地图 CRDT
创建存储地图的桶类型
root@node# riak-admin bucket-type create maps '{"props":{"datatype":"map"}}'
激活桶类型
root@node# riak-admin bucket-type activate maps
创建地图并更新 Riak
1> rr(riakc_map).
[map]
2> {ok,Pid}=riakc_pb_socket:start("127.0.0.1",8087),
2> M=riakc_map:new(),
2> Key = <<"testkey">>,
2> Bucket = <<"testbucket">>,
2> Type = <<"maps">>,
2> QP = lists:seq(1,10),
2> M1 = riakc_map:update({<<"post_id">>, register},
2> fun(R) -> riakc_register:set(Key, R) end, M),
2> M2 = riakc_map:update({<<"userids">>, set},
2> fun(S) ->
2> lists:foldl(fun(Q,Acc) -> riakc_set:add_element(list_to_binary(integer_to_list(Q)), Acc) end,S, QP)
2> end, M1),
2> riakc_pb_socket:modify_type(Pid,fun(_) -> M2 end,{Type,Bucket},Key,[create]).
ok
检索地图并检查值
3> {ok,Map} = riakc_pb_socket:fetch_type(Pid,{Type,Bucket},Key),
3> riakc_map:value(Map).
[{{<<"post_id">>,register},<<"testkey">>},
{{<<"userids">>,set},
[<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
<<"7">>,<<"8">>,<<"9">>]}]
4> riakc_map:fetch_keys(Map).
[{<<"post_id">>,register},{<<"userids">>,set}]
5> riakc_map:is_key({<<"post_id">>, register}, Map).
true
6> riakc_map:is_key({<<"post_id">>}, Map).
false
为了比较,这是存储前的值:
7> M2.
#map{value = [],
updates = [{{<<"post_id">>,register},
{register,<<>>,<<"testkey">>}},
{{<<"userids">>,set},
{set,[],
[<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
<<"7">>,<<"8">>,<<"9">>],
[],undefined}}],
removes = [],context = undefined}
这是最终值:
8> Map.
#map{value = [{{<<"post_id">>,register},<<"testkey">>},
{{<<"userids">>,set},
[<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
<<"7">>,<<"8">>,<<"9">>]}],
updates = [],removes = [],
context = <<131,108,0,0,0,1,104,2,109,0,0,0,8,35,9,254,
249,120,246,57,114,97,8,106>>}
更改现有地图是通过隐式提取处理的:
2> riakc_pb_socket:modify_type(Pid,
fun(OldMap) ->
riakc_map:update({<<"userids">>, set},
fun(S) ->
riakc_set:add_element(<<"100">>, S)
end, OldMap)
end,
{Type,Bucket},Key,[]).
ok
3> riakc_pb_socket:fetch_type(Pid,{Type,Bucket},Key).
{ok,{map,[{{<<"post_id">>,register},<<"testkey">>},
{{<<"userids">>,set},
[<<"1">>,<<"10">>,<<"100">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,
<<"6">>,<<"7">>,<<"8">>,<<"9">>]}],
[],[],
<<131,108,0,0,0,1,104,2,109,0,0,0,8,35,9,254,249,120,
246,57,114,97,...>>}}
我正在尝试在 Riak 中存储寄存器和集合的映射,并使用 Riak 的 pb 客户端处理它们。我的目标是保存地图,然后从返回的对象中提取持久化的地图并对其进行处理(在本例中,从集合中提取寄存器和值并转换为 JSON)。似乎尝试使用 riakc_map:fetch 从返回的对象中提取值不是正确的方法??
这是我创建它的方式(键是一个 uuid,QP 是一个整数列表):
M = riak:new_map(),
M1 = riak:map_update({<<"post_id">>, register},
fun(R) -> riakc_register:set(Key, R) end, M),
M2 = riak:map_update({<<"userids">>, set},
fun(S) ->
[riakc_set:add_element(helpers:int_to_bin(Q), S) || Q <- QP]
end, M2),
[...]
然后我将它包装在一个对象中,并保存该对象。
Obj = riakc_obj:new(<<?POST_BUCKET>>, Key, Map) %% Map created via new_map()
{ok, Obj2} = riakc_pb_socket:put(Pid, <<?POST_BUCKET>>, Obj, [return_body])
现在这就是我 运行 遇到麻烦的地方:
Post = binary_to_term(riakc_obj:get_value(Obj2)));
Keys = riakc_map:fetch_keys(Post), %% returns []
OrigMap = riakc_map:value(Post), %% returns []
IsKey1 = riakc_map:is_key({<<"post_id">>, register}, Post), %% false
IsKey2 = riakc_map:is_key({<<"post_id">>}, Post), %% false
最重要的
PostId = riakc_map:fetch({<<"post_id">>, register}, Post),
死于
{function_clause, [{orddict,fetch, ...
Post 在 binary_to_term(riakc_obj:get_value(Obj2))); 之后,就我而言看起来是正确的可以说:
< Post = {map,[],
[{{<<"content">>,register},
{register,<<>>,<<"<post>Hello, World!\r\n</post>">>}},
{{<<"post_id">>,register},
{register,<<>>,<<"238e4300-a651-11e4-86c8-6003088f077a">>}},
{{<<"userids">>,set},
[{set,[],[<<"-1">>],[],undefined}]},
.....
非常感谢任何帮助!
有些地方不太对劲。此更新调用
M2 = riakc_map:update({<<"userids">>, set},
fun(S) ->
[riakc_set:add_element(helpers:int_to_bin(Q), S) || Q <- QP]
end, M1),
不会创建包含多个成员的集合,它会创建一个集合列表,每个集合都有一个成员。如果您尝试构建一组整数,请使用折叠而不是列表理解:
M2 = riakc_map:update({<<"userids">>, set},
fun(S) ->
lists:foldl(
fun(Q, Acc) ->
riakc_set:add_element(helpers:int_to_bin(Q), Acc)
end,
S, QP),
end, M1),
这实际上并没有更新地图的值,它在记录中分阶段更新:
#map{value = [],
updates = [{{<<"post_id">>,register},
{register,<<>>,<<"uuid">>}},
{{<<"userids">>,set},
{set,[],
[<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
<<"7">>,<<"8">>,<<"9">>],
[],undefined}}],
removes = [],context = undefined}
此时,riakc_map:value
returns []
因为那是记录中的值。要将分阶段更新应用到值,您需要调用 riakc_pb_socket:modify_type
,这也会将值存储在 Riak 中。
完整过程
准备 Riak 集群以使用地图 CRDT
创建存储地图的桶类型
root@node# riak-admin bucket-type create maps '{"props":{"datatype":"map"}}'
激活桶类型
root@node# riak-admin bucket-type activate maps
创建地图并更新 Riak
1> rr(riakc_map).
[map]
2> {ok,Pid}=riakc_pb_socket:start("127.0.0.1",8087),
2> M=riakc_map:new(),
2> Key = <<"testkey">>,
2> Bucket = <<"testbucket">>,
2> Type = <<"maps">>,
2> QP = lists:seq(1,10),
2> M1 = riakc_map:update({<<"post_id">>, register},
2> fun(R) -> riakc_register:set(Key, R) end, M),
2> M2 = riakc_map:update({<<"userids">>, set},
2> fun(S) ->
2> lists:foldl(fun(Q,Acc) -> riakc_set:add_element(list_to_binary(integer_to_list(Q)), Acc) end,S, QP)
2> end, M1),
2> riakc_pb_socket:modify_type(Pid,fun(_) -> M2 end,{Type,Bucket},Key,[create]).
ok
检索地图并检查值
3> {ok,Map} = riakc_pb_socket:fetch_type(Pid,{Type,Bucket},Key),
3> riakc_map:value(Map).
[{{<<"post_id">>,register},<<"testkey">>},
{{<<"userids">>,set},
[<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
<<"7">>,<<"8">>,<<"9">>]}]
4> riakc_map:fetch_keys(Map).
[{<<"post_id">>,register},{<<"userids">>,set}]
5> riakc_map:is_key({<<"post_id">>, register}, Map).
true
6> riakc_map:is_key({<<"post_id">>}, Map).
false
为了比较,这是存储前的值:
7> M2.
#map{value = [],
updates = [{{<<"post_id">>,register},
{register,<<>>,<<"testkey">>}},
{{<<"userids">>,set},
{set,[],
[<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
<<"7">>,<<"8">>,<<"9">>],
[],undefined}}],
removes = [],context = undefined}
这是最终值:
8> Map.
#map{value = [{{<<"post_id">>,register},<<"testkey">>},
{{<<"userids">>,set},
[<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
<<"7">>,<<"8">>,<<"9">>]}],
updates = [],removes = [],
context = <<131,108,0,0,0,1,104,2,109,0,0,0,8,35,9,254,
249,120,246,57,114,97,8,106>>}
更改现有地图是通过隐式提取处理的:
2> riakc_pb_socket:modify_type(Pid,
fun(OldMap) ->
riakc_map:update({<<"userids">>, set},
fun(S) ->
riakc_set:add_element(<<"100">>, S)
end, OldMap)
end,
{Type,Bucket},Key,[]).
ok
3> riakc_pb_socket:fetch_type(Pid,{Type,Bucket},Key).
{ok,{map,[{{<<"post_id">>,register},<<"testkey">>},
{{<<"userids">>,set},
[<<"1">>,<<"10">>,<<"100">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,
<<"6">>,<<"7">>,<<"8">>,<<"9">>]}],
[],[],
<<131,108,0,0,0,1,104,2,109,0,0,0,8,35,9,254,249,120,
246,57,114,97,...>>}}