Riak 的 erlang pb 客户端和地图获取

Riak's erlang pb client and map fetch

我正在尝试在 Riak 中存储寄存器和集合的映射,并使用 Riak 的 pb 客户端处理它们。我的目标是保存地图,然后从返回的对象中提取持久化的地图并对其进行处理(在本例中,从集合中提取寄存器和值并转换为 JSON)。似乎尝试使用 riakc_map:fetch 从返回的对象中提取值不是正确的方法??

这是我创建它的方式(键是一个 uuid,QP 是一个整数列表):

M = riak:new_map(),
M1 = riak:map_update({<<"post_id">>, register},
    fun(R) -> riakc_register:set(Key, R) end, M),
M2 = riak:map_update({<<"userids">>, set},
    fun(S) ->
        [riakc_set:add_element(helpers:int_to_bin(Q), S) || Q <- QP]
    end, M2),
[...]

然后我将它包装在一个对象中,并保存该对象。

Obj = riakc_obj:new(<<?POST_BUCKET>>, Key, Map) %% Map created via new_map()
{ok, Obj2} = riakc_pb_socket:put(Pid, <<?POST_BUCKET>>, Obj, [return_body])

现在这就是我 运行 遇到麻烦的地方:

Post = binary_to_term(riakc_obj:get_value(Obj2)));
Keys = riakc_map:fetch_keys(Post),  %% returns []
OrigMap = riakc_map:value(Post),    %% returns []
IsKey1 = riakc_map:is_key({<<"post_id">>, register}, Post), %% false
IsKey2 = riakc_map:is_key({<<"post_id">>}, Post),   %% false

最重要的

PostId = riakc_map:fetch({<<"post_id">>, register}, Post),

死于

{function_clause, [{orddict,fetch, ...

Post 在 binary_to_term(riakc_obj:get_value(Obj2))); 之后,就我而言看起来是正确的可以说:

< Post = {map,[],
          [{{<<"content">>,register},
            {register,<<>>,<<"<post>Hello, World!\r\n</post>">>}},
           {{<<"post_id">>,register},
            {register,<<>>,<<"238e4300-a651-11e4-86c8-6003088f077a">>}},
           {{<<"userids">>,set},
              [{set,[],[<<"-1">>],[],undefined}]},
            .....

非常感谢任何帮助!

有些地方不太对劲。此更新调用

M2 = riakc_map:update({<<"userids">>, set}, 
    fun(S) ->
        [riakc_set:add_element(helpers:int_to_bin(Q), S) || Q <- QP]
    end, M1),

不会创建包含多个成员的集合,它会创建一个集合列表,每个集合都有一个成员。如果您尝试构建一组整数,请使用折叠而不是列表理解:

M2 = riakc_map:update({<<"userids">>, set}, 
    fun(S) ->
        lists:foldl(
               fun(Q, Acc) ->
                   riakc_set:add_element(helpers:int_to_bin(Q), Acc) 
               end, 
               S, QP),
    end, M1),

这实际上并没有更新地图的值,它在记录中分阶段更新:

#map{value = [],
     updates = [{{<<"post_id">>,register},
                 {register,<<>>,<<"uuid">>}},
                {{<<"userids">>,set},
                 {set,[],
                      [<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
                       <<"7">>,<<"8">>,<<"9">>],
                      [],undefined}}],
     removes = [],context = undefined}

此时,riakc_map:value returns [] 因为那是记录中的值。要将分阶段更新应用到值,您需要调用 riakc_pb_socket:modify_type,这也会将值存储在 Riak 中。


完整过程

准备 Riak 集群以使用地图 CRDT

  • 创建存储地图的桶类型
    root@node# riak-admin bucket-type create maps '{"props":{"datatype":"map"}}'

  • 激活桶类型
    root@node# riak-admin bucket-type activate maps

创建地图并更新 Riak

1> rr(riakc_map).
[map]
2> {ok,Pid}=riakc_pb_socket:start("127.0.0.1",8087),
2> M=riakc_map:new(),
2> Key = <<"testkey">>,
2> Bucket = <<"testbucket">>,
2> Type = <<"maps">>,
2> QP = lists:seq(1,10),
2> M1 = riakc_map:update({<<"post_id">>, register},
2> fun(R) -> riakc_register:set(Key, R) end, M),
2> M2 = riakc_map:update({<<"userids">>, set},
2> fun(S) ->
2> lists:foldl(fun(Q,Acc) -> riakc_set:add_element(list_to_binary(integer_to_list(Q)), Acc) end,S, QP)
2> end, M1),
2> riakc_pb_socket:modify_type(Pid,fun(_) -> M2 end,{Type,Bucket},Key,[create]).
ok

检索地图并检查值

3> {ok,Map} = riakc_pb_socket:fetch_type(Pid,{Type,Bucket},Key),
3> riakc_map:value(Map).
[{{<<"post_id">>,register},<<"testkey">>},
 {{<<"userids">>,set},
  [<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
   <<"7">>,<<"8">>,<<"9">>]}]
4> riakc_map:fetch_keys(Map).
[{<<"post_id">>,register},{<<"userids">>,set}]
5> riakc_map:is_key({<<"post_id">>, register}, Map).
true
6> riakc_map:is_key({<<"post_id">>}, Map).
false

为了比较,这是存储前的值:

7> M2.

#map{value = [],
     updates = [{{<<"post_id">>,register},
                 {register,<<>>,<<"testkey">>}},
                {{<<"userids">>,set},
                 {set,[],
                      [<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
                       <<"7">>,<<"8">>,<<"9">>],
                      [],undefined}}],
     removes = [],context = undefined}

这是最终值:

8> Map.
#map{value = [{{<<"post_id">>,register},<<"testkey">>},
              {{<<"userids">>,set},
               [<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
                <<"7">>,<<"8">>,<<"9">>]}],
     updates = [],removes = [],
     context = <<131,108,0,0,0,1,104,2,109,0,0,0,8,35,9,254,
                 249,120,246,57,114,97,8,106>>}

更改现有地图是通过隐式提取处理的:

2> riakc_pb_socket:modify_type(Pid,
                 fun(OldMap) ->
                    riakc_map:update({<<"userids">>, set},
                               fun(S) ->
                                   riakc_set:add_element(<<"100">>, S)
                               end, OldMap)
                 end,
                 {Type,Bucket},Key,[]).
ok
3> riakc_pb_socket:fetch_type(Pid,{Type,Bucket},Key).
{ok,{map,[{{<<"post_id">>,register},<<"testkey">>},
      {{<<"userids">>,set},
       [<<"1">>,<<"10">>,<<"100">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,
        <<"6">>,<<"7">>,<<"8">>,<<"9">>]}],
     [],[],
     <<131,108,0,0,0,1,104,2,109,0,0,0,8,35,9,254,249,120,
       246,57,114,97,...>>}}