[RIAK]InternalAPI-in-CommitHook
[RIAK]InternalAPI-in-CommitHook
我是 erlang 和 riak 的新手。几个月前我开始使用 riak 作为 kv 存储。现在我想为 riak 实现一个提交挂钩,以便 riak 可以帮助我进行一些统计。
我阅读了一些文档并编写了一个预挂钩脚本,它将获取对象键并将其存储到一个集合中。
如果只有一个客户端写入 riak,这个钩子工作正常,但如果我增加到 riak 写入的连接,我发现它丢失了集合中的一些元素。看起来 crdt_op 没有进行合并 operation.And 日志文件中没有明显的错误。
谁能帮我指出发生了什么或我错过了什么。
我正在使用 riak 2.1.3
谢谢大家!
这是钩子脚本:
-module(myhook).
-export([pretest/1]).
now_to_local_string({MegaSecs, Secs, MicroSecs}) ->
LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}),
{{Year, Month, Day}, {Hour, Minute, _}} = LocalTime,
TimeStr = lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w",
[Year, Month, Day, Hour, Minute])),
TimeStr.
is_deleted(Object)->
case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of
{ok,_} ->
true;
_ ->
false
end.
pretest(Object) ->
% timer:sleep(10000),
try
ObjBucket = riak_object:bucket(Object),
% riak_object:bucket(Obj).
Bucket = element(2, ObjBucket),
BucketType = element(1, ObjBucket),
ObjKey = riak_object:key(Object),
% Key = binary_to_list(ObjKey),
% ObjData = riak_object:get_value(Object),
% Msg = binary_to_list(ObjData),
CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, Bucket}, {k, ObjKey}, {t, BucketType}]})),
case is_deleted(Object) of
true ->
KeyPrefix = "delete";
_ ->
KeyPrefix = "update"
end,
CurMin = now_to_local_string(os:timestamp()),
IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, KeyPrefix])),
%% Get a riak client
{ok, C} = riak:local_client(),
% get node obj
ThisNode = atom_to_binary(node(), latin1),
% get index obj and set context
BType = <<"archive">>,
B = <<"local-test">>,
{SetObj, Context} = case C:get({BType, B}, IndexKey) of
{error, notfound} ->
ThisSetObj = riak_kv_crdt:new({BType, B}, IndexKey, riak_dt_orswot),
{ThisSetObj, undefined};
{ok, ThisSetObj} ->
% The datatype update requires the context if the value exists
{{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, riak_dt_orswot),
{ThisSetObj, Ctx}
end,
UpdateIndex = [{add, CommitItem}],
UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, Context},
% UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, undefined},
NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp),
error_logger:info_msg("Updating index for ~s,to set ~s~n", [binary:bin_to_list(CommitItem), IndexKey]),
C:put(NewObj),
Object
catch
error:Error ->
{fail, lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))}
end.
这是设置的水桶道具
active: true
allow_mult: true
basic_quorum: false
big_vclock: 50
chash_keyfun: {riak_core_util,chash_std_keyfun}
claimant: 'riak@192.168.100.2'
datatype: set
dvv_enabled: true
dw: quorum
last_write_wins: false
linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun}
n_val: 3
notfound_ok: true
old_vclock: 86400
postcommit: []
pr: 0
precommit: []
pw: 0
r: quorum
rw: quorum
small_vclock: 50
w: quorum
young_vclock: 20
你的 put C:put(NewObj)
没有 Riak 在处理 CRDT 时使用的相同选项。
Riak KV 使用此函数从客户端请求更新 CRDT:
https://github.com/basho/riak_kv/blob/2.1.3/src/riak_kv_pb_crdt.erl#L162-L175
maybe_update({true, true}, Req, State0) ->
#dtupdatereq{bucket=B, key=K, type=BType,
include_context=InclCtx,
context=Ctx} = Req,
#state{client=C, mod=Mod, op=Op} = State0,
{Key, ReturnKey} = get_key(K),
O = riak_kv_crdt:new({BType, B}, Key, Mod),
Options0 = make_options(Req),
CrdtOp = make_operation(Mod, Op, Ctx),
Options = [{crdt_op, CrdtOp},
{retry_put_coordinator_failure, false}] ++ Options0,
Resp = C:put(O, Options),
State = State0#state{return_key=ReturnKey, return_ctx=InclCtx},
process_update_response(Resp, State);
请注意,它明确传递了 crdt_op
和 retry_put_coordinator_failure
选项。
作为 crdt_op
的值传递的记录是从此函数生成的:
make_operation(Mod, Op, Ctx) ->
#crdt_op{mod=Mod, op=Op, ctx=Ctx}.
我是 erlang 和 riak 的新手。几个月前我开始使用 riak 作为 kv 存储。现在我想为 riak 实现一个提交挂钩,以便 riak 可以帮助我进行一些统计。 我阅读了一些文档并编写了一个预挂钩脚本,它将获取对象键并将其存储到一个集合中。 如果只有一个客户端写入 riak,这个钩子工作正常,但如果我增加到 riak 写入的连接,我发现它丢失了集合中的一些元素。看起来 crdt_op 没有进行合并 operation.And 日志文件中没有明显的错误。
谁能帮我指出发生了什么或我错过了什么。
我正在使用 riak 2.1.3
谢谢大家!
这是钩子脚本:
-module(myhook).
-export([pretest/1]).
now_to_local_string({MegaSecs, Secs, MicroSecs}) ->
LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}),
{{Year, Month, Day}, {Hour, Minute, _}} = LocalTime,
TimeStr = lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w",
[Year, Month, Day, Hour, Minute])),
TimeStr.
is_deleted(Object)->
case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of
{ok,_} ->
true;
_ ->
false
end.
pretest(Object) ->
% timer:sleep(10000),
try
ObjBucket = riak_object:bucket(Object),
% riak_object:bucket(Obj).
Bucket = element(2, ObjBucket),
BucketType = element(1, ObjBucket),
ObjKey = riak_object:key(Object),
% Key = binary_to_list(ObjKey),
% ObjData = riak_object:get_value(Object),
% Msg = binary_to_list(ObjData),
CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, Bucket}, {k, ObjKey}, {t, BucketType}]})),
case is_deleted(Object) of
true ->
KeyPrefix = "delete";
_ ->
KeyPrefix = "update"
end,
CurMin = now_to_local_string(os:timestamp()),
IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, KeyPrefix])),
%% Get a riak client
{ok, C} = riak:local_client(),
% get node obj
ThisNode = atom_to_binary(node(), latin1),
% get index obj and set context
BType = <<"archive">>,
B = <<"local-test">>,
{SetObj, Context} = case C:get({BType, B}, IndexKey) of
{error, notfound} ->
ThisSetObj = riak_kv_crdt:new({BType, B}, IndexKey, riak_dt_orswot),
{ThisSetObj, undefined};
{ok, ThisSetObj} ->
% The datatype update requires the context if the value exists
{{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, riak_dt_orswot),
{ThisSetObj, Ctx}
end,
UpdateIndex = [{add, CommitItem}],
UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, Context},
% UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, undefined},
NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp),
error_logger:info_msg("Updating index for ~s,to set ~s~n", [binary:bin_to_list(CommitItem), IndexKey]),
C:put(NewObj),
Object
catch
error:Error ->
{fail, lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))}
end.
这是设置的水桶道具
active: true
allow_mult: true
basic_quorum: false
big_vclock: 50
chash_keyfun: {riak_core_util,chash_std_keyfun}
claimant: 'riak@192.168.100.2'
datatype: set
dvv_enabled: true
dw: quorum
last_write_wins: false
linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun}
n_val: 3
notfound_ok: true
old_vclock: 86400
postcommit: []
pr: 0
precommit: []
pw: 0
r: quorum
rw: quorum
small_vclock: 50
w: quorum
young_vclock: 20
你的 put C:put(NewObj)
没有 Riak 在处理 CRDT 时使用的相同选项。
Riak KV 使用此函数从客户端请求更新 CRDT:
https://github.com/basho/riak_kv/blob/2.1.3/src/riak_kv_pb_crdt.erl#L162-L175
maybe_update({true, true}, Req, State0) ->
#dtupdatereq{bucket=B, key=K, type=BType,
include_context=InclCtx,
context=Ctx} = Req,
#state{client=C, mod=Mod, op=Op} = State0,
{Key, ReturnKey} = get_key(K),
O = riak_kv_crdt:new({BType, B}, Key, Mod),
Options0 = make_options(Req),
CrdtOp = make_operation(Mod, Op, Ctx),
Options = [{crdt_op, CrdtOp},
{retry_put_coordinator_failure, false}] ++ Options0,
Resp = C:put(O, Options),
State = State0#state{return_key=ReturnKey, return_ctx=InclCtx},
process_update_response(Resp, State);
请注意,它明确传递了 crdt_op
和 retry_put_coordinator_failure
选项。
作为 crdt_op
的值传递的记录是从此函数生成的:
make_operation(Mod, Op, Ctx) ->
#crdt_op{mod=Mod, op=Op, ctx=Ctx}.