并行化时 Postgres 自定义聚合 returns null
Postgres custom aggregation returns null when parallelized
我在 postgres 11.3 中创建了一个自定义聚合,如果关闭,它可以在并行时工作。当我将其标记为 parallel = safe 时,它 returns 为空。
有人可以指出我从哪里开始寻找的方向,或者我如何调试 postgres 中的并行聚合?在非并行聚合中,我可以将每条记录的状态插入临时 table,但在并行查询中不允许插入...
这是合计:
CREATE OR REPLACE FUNCTION array_sort(ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest() ORDER BY 1)
$$;
create type _stats_agg_accum_type AS (
cnt bigint,
q double precision[],
n double precision[],
np double precision[],
dn double precision[]
);
create type _stats_agg_result_type AS (
count bigint,
q25 double precision,
q50 double precision,
q75 double precision
);
create or replace function _stats_agg_p2_parabolic(_stats_agg_accum_type, double precision, double precision)
returns double precision AS '
DECLARE
a alias for ;
i alias for ;
d alias for ;
BEGIN
RETURN a.q[i] + d / (a.n[i + 1] - a.n[i - 1]) * ((a.n[i] - a.n[i - 1] + d) * (a.q[i + 1] - a.q[i]) / (a.n[i + 1] - a.n[i]) + (a.n[i + 1] - a.n[i] - d) * (a.q[i] - a.q[i - 1]) / (a.n[i] - a.n[i - 1]));
END;
'
language plpgsql;
create or replace function _stats_agg_p2_linear(_stats_agg_accum_type, double precision, double precision)
returns double precision AS '
DECLARE
a alias for ;
i alias for ;
d alias for ;
BEGIN
return a.q[i] + d * (a.q[i + d] - a.q[i]) / (a.n[i + d] - a.n[i]);
END;
'
language plpgsql;
create or replace function _stats_agg_accumulator(_stats_agg_accum_type, double precision)
returns _stats_agg_accum_type AS '
DECLARE
a ALIAS FOR ;
x alias for ;
k int;
d double precision;
qp double precision;
BEGIN
a.cnt = a.cnt + 1;
if a.cnt <= 5 then
a.q = array_append(a.q, x);
if a.cnt = 5 then
a.q = array_sort(a.q);
end if;
return a;
end if;
case
when x < a.q[1] then
a.q[1] = x;
k = 1;
when x >= a.q[1] and x < a.q[2] then
k = 1;
when x >= a.q[2] and x < a.q[3] then
k = 2;
when x >= a.q[3] and x < a.q[4] then
k = 3;
when x >= a.q[4] and x <= a.q[5] then
k = 4;
when x > a.q[5] then
a.q[5] = x;
k = 4;
end case;
for ii in 1..5 loop
if ii > k then
a.n[ii] = a.n[ii] + 1;
end if;
a.np[ii] = a.np[ii] + a.dn[ii];
end loop;
for ii in 2..4 loop
d = a.np[ii] - a.n[ii];
if (d >= 1 and a.n[ii+1] - a.n[ii] > 1) or (d <= -1 and a.n[ii-1] - a.n[ii] < -1) then
d = sign(d);
qp = _stats_agg_p2_parabolic(a, ii, d);
if qp > a.q[ii-1] and qp < a.q[ii+1] then
a.q[ii] = qp;
else
a.q[ii] = _stats_agg_p2_linear(a, ii, d);
end if;
a.n[ii] = a.n[ii] + d;
end if;
end loop;
return a;
END;
'
language plpgsql;
create or replace function _stats_agg_combiner(_stats_agg_accum_type, _stats_agg_accum_type)
returns _stats_agg_accum_type AS '
DECLARE
a alias for ;
b alias for ;
c _stats_agg_accum_type;
BEGIN
c.cnt = a.cnt + b.cnt;
c.q[2] = (a.q[2] + b.q[2]) / 2;
c.q[3] = (a.q[3] + b.q[3]) / 2;
c.q[4] = (a.q[4] + b.q[4]) / 2;
RETURN c;
END;
'
strict language plpgsql;
create or replace function _stats_agg_finalizer(_stats_agg_accum_type)
returns _stats_agg_result_type AS '
BEGIN
RETURN row(
.cnt,
.q[2],
.q[3],
.q[4]
);
END;
'
language plpgsql;
create aggregate stats_agg(double precision) (
sfunc = _stats_agg_accumulator,
stype = _stats_agg_accum_type,
finalfunc = _stats_agg_finalizer,
combinefunc = _stats_agg_combiner,
--parallel = safe,
initcond = '(0, {}, "{1,2,3,4,5}", "{1,2,3,4,5}", "{0,0.25,0.5,0.75,1}")'
);
这是设置和 运行 代码:
--CREATE TABLE temp (val double precision);
--insert into temp (val) select i from generate_series(0, 150000) as t(i);
select (stats_agg(val)).* from temp;
预期结果如下,当运行 in parallel = unsafe
时有效
150001, 37500, 75000, 112500
并行=安全我得到空值:
150001, null, null, null
问题出在 _stats_agg_combiner
函数中。函数定义包含 strict
关键字,因此无需检查空输入值。
在此特定聚合中,_stats_agg_accum_type
包含多个数组,_stats_agg_combiner
函数要求这些数组至少填充 5 个条目。这假设每个新的 _stats_agg_accum_type
实例在传递给 _stats_agg_combiner
函数之前至少处理 5 条记录。
正在 table 上进行测试,其中包含 150k 条记录,并假设每个实例因此至少接收 5 条记录。无论出于何种原因,这是一个不正确的假设。无论使用多少工人(用 1-4 测试),总是至少有一个实例恰好处理了 0 条记录。
解决方案是添加对已处理零条记录且数组长度为 0 的 _stats_agg_accum_type
实例的支持。请参见下面的代码。
create or replace function _stats_agg_combiner(_stats_agg_accum_type, _stats_agg_accum_type)
returns _stats_agg_accum_type AS '
DECLARE
a alias for ;
b alias for ;
c _stats_agg_accum_type;
addA boolean;
addB boolean;
BEGIN
addA = a.cnt <= 5;
addB = b.cnt <= 5;
if addA and not addB then
c = b;
elsif addB and not addA then
c = a;
else
c.cnt = a.cnt + b.cnt;
for ii in 2..4 loop
c.q[ii] = (a.q[ii] + b.q[ii]) / 2;
end loop;
end if;
for ii in 1..5 loop
if addA and ii <= a.cnt then
c = _stats_agg_accumulator(c, a.q[ii]);
end if;
if addB and ii <= b.cnt then
c = _stats_agg_accumulator(c, b.q[ii]);
end if;
end loop;
RETURN c;
END;
'
language plpgsql strict;
我在 postgres 11.3 中创建了一个自定义聚合,如果关闭,它可以在并行时工作。当我将其标记为 parallel = safe 时,它 returns 为空。
有人可以指出我从哪里开始寻找的方向,或者我如何调试 postgres 中的并行聚合?在非并行聚合中,我可以将每条记录的状态插入临时 table,但在并行查询中不允许插入...
这是合计:
CREATE OR REPLACE FUNCTION array_sort(ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest() ORDER BY 1)
$$;
create type _stats_agg_accum_type AS (
cnt bigint,
q double precision[],
n double precision[],
np double precision[],
dn double precision[]
);
create type _stats_agg_result_type AS (
count bigint,
q25 double precision,
q50 double precision,
q75 double precision
);
create or replace function _stats_agg_p2_parabolic(_stats_agg_accum_type, double precision, double precision)
returns double precision AS '
DECLARE
a alias for ;
i alias for ;
d alias for ;
BEGIN
RETURN a.q[i] + d / (a.n[i + 1] - a.n[i - 1]) * ((a.n[i] - a.n[i - 1] + d) * (a.q[i + 1] - a.q[i]) / (a.n[i + 1] - a.n[i]) + (a.n[i + 1] - a.n[i] - d) * (a.q[i] - a.q[i - 1]) / (a.n[i] - a.n[i - 1]));
END;
'
language plpgsql;
create or replace function _stats_agg_p2_linear(_stats_agg_accum_type, double precision, double precision)
returns double precision AS '
DECLARE
a alias for ;
i alias for ;
d alias for ;
BEGIN
return a.q[i] + d * (a.q[i + d] - a.q[i]) / (a.n[i + d] - a.n[i]);
END;
'
language plpgsql;
create or replace function _stats_agg_accumulator(_stats_agg_accum_type, double precision)
returns _stats_agg_accum_type AS '
DECLARE
a ALIAS FOR ;
x alias for ;
k int;
d double precision;
qp double precision;
BEGIN
a.cnt = a.cnt + 1;
if a.cnt <= 5 then
a.q = array_append(a.q, x);
if a.cnt = 5 then
a.q = array_sort(a.q);
end if;
return a;
end if;
case
when x < a.q[1] then
a.q[1] = x;
k = 1;
when x >= a.q[1] and x < a.q[2] then
k = 1;
when x >= a.q[2] and x < a.q[3] then
k = 2;
when x >= a.q[3] and x < a.q[4] then
k = 3;
when x >= a.q[4] and x <= a.q[5] then
k = 4;
when x > a.q[5] then
a.q[5] = x;
k = 4;
end case;
for ii in 1..5 loop
if ii > k then
a.n[ii] = a.n[ii] + 1;
end if;
a.np[ii] = a.np[ii] + a.dn[ii];
end loop;
for ii in 2..4 loop
d = a.np[ii] - a.n[ii];
if (d >= 1 and a.n[ii+1] - a.n[ii] > 1) or (d <= -1 and a.n[ii-1] - a.n[ii] < -1) then
d = sign(d);
qp = _stats_agg_p2_parabolic(a, ii, d);
if qp > a.q[ii-1] and qp < a.q[ii+1] then
a.q[ii] = qp;
else
a.q[ii] = _stats_agg_p2_linear(a, ii, d);
end if;
a.n[ii] = a.n[ii] + d;
end if;
end loop;
return a;
END;
'
language plpgsql;
create or replace function _stats_agg_combiner(_stats_agg_accum_type, _stats_agg_accum_type)
returns _stats_agg_accum_type AS '
DECLARE
a alias for ;
b alias for ;
c _stats_agg_accum_type;
BEGIN
c.cnt = a.cnt + b.cnt;
c.q[2] = (a.q[2] + b.q[2]) / 2;
c.q[3] = (a.q[3] + b.q[3]) / 2;
c.q[4] = (a.q[4] + b.q[4]) / 2;
RETURN c;
END;
'
strict language plpgsql;
create or replace function _stats_agg_finalizer(_stats_agg_accum_type)
returns _stats_agg_result_type AS '
BEGIN
RETURN row(
.cnt,
.q[2],
.q[3],
.q[4]
);
END;
'
language plpgsql;
create aggregate stats_agg(double precision) (
sfunc = _stats_agg_accumulator,
stype = _stats_agg_accum_type,
finalfunc = _stats_agg_finalizer,
combinefunc = _stats_agg_combiner,
--parallel = safe,
initcond = '(0, {}, "{1,2,3,4,5}", "{1,2,3,4,5}", "{0,0.25,0.5,0.75,1}")'
);
这是设置和 运行 代码:
--CREATE TABLE temp (val double precision);
--insert into temp (val) select i from generate_series(0, 150000) as t(i);
select (stats_agg(val)).* from temp;
预期结果如下,当运行 in parallel = unsafe
时有效150001, 37500, 75000, 112500
并行=安全我得到空值:
150001, null, null, null
问题出在 _stats_agg_combiner
函数中。函数定义包含 strict
关键字,因此无需检查空输入值。
在此特定聚合中,_stats_agg_accum_type
包含多个数组,_stats_agg_combiner
函数要求这些数组至少填充 5 个条目。这假设每个新的 _stats_agg_accum_type
实例在传递给 _stats_agg_combiner
函数之前至少处理 5 条记录。
正在 table 上进行测试,其中包含 150k 条记录,并假设每个实例因此至少接收 5 条记录。无论出于何种原因,这是一个不正确的假设。无论使用多少工人(用 1-4 测试),总是至少有一个实例恰好处理了 0 条记录。
解决方案是添加对已处理零条记录且数组长度为 0 的 _stats_agg_accum_type
实例的支持。请参见下面的代码。
create or replace function _stats_agg_combiner(_stats_agg_accum_type, _stats_agg_accum_type)
returns _stats_agg_accum_type AS '
DECLARE
a alias for ;
b alias for ;
c _stats_agg_accum_type;
addA boolean;
addB boolean;
BEGIN
addA = a.cnt <= 5;
addB = b.cnt <= 5;
if addA and not addB then
c = b;
elsif addB and not addA then
c = a;
else
c.cnt = a.cnt + b.cnt;
for ii in 2..4 loop
c.q[ii] = (a.q[ii] + b.q[ii]) / 2;
end loop;
end if;
for ii in 1..5 loop
if addA and ii <= a.cnt then
c = _stats_agg_accumulator(c, a.q[ii]);
end if;
if addB and ii <= b.cnt then
c = _stats_agg_accumulator(c, b.q[ii]);
end if;
end loop;
RETURN c;
END;
'
language plpgsql strict;