为什么我的 Postgres VarBit(C 语言函数)中的某些位针对特定输入被置零?
Why are some of the bits in my Postgres VarBit (C-language function) being zeroed for specific input?
我正在尝试为 Postgres 编写一个 C 语言函数,它接受一个整数数组并将整数转换为表示数组中整数的莫顿编码(z 曲线编码)的位串。传入两个参数:第一个是实际数组,第二个是每个整数要使用的位数,从最低有效位开始(最大可能是 31,因为我们必须使用无符号整数)。我正在使用我在 Postgres 之外测试过的实现(请参阅代码中的注释)并且我知道它可以工作。
代码(没有对整数、大小界限等进行一些明显必要的检查):
#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
#define MC_SEGMENT(bit, dim, ndim) ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim) ((((bit)*(ndim)) + dim) % CHAR_BIT)
Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
ArrayType *input_vector;
int32 *toencode;
int i, n, rlen, len;
int16 num_bits_per;
VarBit *result;
int mask;
int ipad;
char tempvalue;
int bit;
int dim;
int num_bits;
int num_bytes;
/* Ensure first argument is not null. */
if (PG_ARGISNULL(0))
{
ereport(ERROR, (errmsg("First operand must be non-null")));
}
/* Ensure second argument is not null. */
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("Second operand must be non-null")));
}
/* Get input array and its length (first argument). */
input_vector = PG_GETARG_ARRAYTYPE_P(0);
n = (ARR_DIMS(input_vector))[0];
ereport(NOTICE, (errmsg("n=%d", n) ));
toencode = (int32 *)ARR_DATA_PTR(input_vector);
/* Get number of bits per dimensions (second argument). */
num_bits_per = PG_GETARG_INT16(1);
/* Allocated the VarBit. */
len = n*num_bits_per;
rlen = VARBITTOTALLEN(len);
result = palloc0(rlen);
SET_VARSIZE(result, rlen);
VARBITLEN(result)
= len;
/* Perform the morton encoding. */
num_bits = num_bits_per*n;
num_bytes = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);
/*** TESTED THIS CODE, IT WORKS, BUT NOT IN POSTGRES. :( ***/
for (bit = 0; bit < num_bits_per; ++bit)
for (dim = 0; dim < n; ++dim)
{
tempvalue
= VARBITS(result)[MC_SEGMENT(bit, dim, n)];
tempvalue
|= (char)
(((toencode[dim] & (1 << bit)) >> bit)
<< MC_SEGMENTBIT(bit, dim, n));
VARBITS(result)[MC_SEGMENT(bit, dim, n)]
= tempvalue;
ereport(NOTICE, (errmsg("[%d,%d]=%d:%x", MC_SEGMENT(bit, dim, n), MC_SEGMENTBIT(bit,dim,n), (((toencode[dim] & (1 << bit)) >> bit)), (int)tempvalue)) );
}
/*** END OF TESTED CODE. ***/
PG_RETURN_VARBIT_P(result);
}
生成文件:
MODULE_big = pgmorton
OBJS = pgmorton.o
PGXS := $(shell pg_config --pgxs)
include $(PGXS)
Make 说明(在代码和 Makefile 所在的目录中):
make install
安装后,运行 来自 postgres 的 shell 使功能可用:
DROP FUNCTION IF EXISTS pg_morton_encode_integer(integer[], smallint);
CREATE FUNCTION pg_morton_encode_integer(integer[], smallint) RETURNS varbit
AS 'pgmorton', 'pg_morton_encode_integer'
LANGUAGE C STRICT;
一些产生明显错误输入的例子:
SELECT pg_morton_encode_integer('{2147483647, 2147483647, 2147483647}'::integer[], 31::smallint); -- gives '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111100011' instead of '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111'
SELECT pg_morton_encode_integer('{2147483647, 2147483647}'::integer[], 2::smallint); -- gives '0000' instead of '1111'
我在 OS X Yosemite 上使用 Postgres.app 版本 9.4.2.0 和默认的 clang 编译器完成所有这些工作。
编辑:正如在下面对我的问题的评论中所指出的,字节需要以不同的顺序写入,因为 Postgres 通过使最高有效字节在前来规范化位串字节顺序。我还另外发现,长度为 n
位的位串,其中 n
不是 8 的倍数(CHAR_BIT,一个字节中的位数)从最低有效字节。例如,如果我的位串是 14 位,那么需要 2 个字节(16 位)来存储位串,字节 0 是最重要的,字节 1 是最不重要的。我们从最低有效字节(即字节 1)中丢失了位 0 和 1(两个最低有效位)。(下图使用 B 表示使用的位,X 表示 t运行 位,以使上述内容清楚)。
byte 0 byte 1
=================================
7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0
B B B B B B B B | B B B B B B X X
还应该注意的是,t运行cated 位只能写入零,否则有 Postgres 崩溃的风险。 :)
这是修复第一个问题的更新代码(第二个关于位 t运行cation 我仍然没有解决),注意下面代码注释中更改的行:
#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
#define MC_SEGMENT(bit, dim, ndim) ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim) ((((bit)*(ndim)) + dim) % CHAR_BIT)
Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
ArrayType *input_vector;
int32 *toencode;
int i, n, rlen, len;
int16 num_bits_per;
VarBit *result;
int mask;
int ipad;
char tempvalue;
int bit;
int dim;
int num_bits;
int num_bytes;
/* Ensure first argument is not null. */
if (PG_ARGISNULL(0))
{
ereport(ERROR, (errmsg("First operand must be non-null")));
}
/* Ensure second argument is not null. */
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("Second operand must be non-null")));
}
/* Get input array and its length (first argument). */
input_vector = PG_GETARG_ARRAYTYPE_P(0);
n = (ARR_DIMS(input_vector))[0];
ereport(NOTICE, (errmsg("n=%d", n) ));
toencode = (int32 *)ARR_DATA_PTR(input_vector);
/* Get number of bits per dimensions (second argument). */
num_bits_per = PG_GETARG_INT16(1);
/* Allocated the VarBit. */
len = n*num_bits_per;
rlen = VARBITTOTALLEN(len);
result = palloc0(rlen);
SET_VARSIZE(result, rlen);
VARBITLEN(result)
= len;
/* Perform the morton encoding. */
num_bits = num_bits_per*n;
num_bytes = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);
/*** TESTED THIS CODE, IT WORKS, BUT NOT IN POSTGRES. :( ***/
for (bit = 0; bit < num_bits_per; ++bit)
for (dim = 0; dim < n; ++dim)
{
// CHANGE!
tempvalue
= VARBITS(result)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)];
tempvalue
|= (char)
(((toencode[dim] & (1 << bit)) >> bit)
<< MC_SEGMENTBIT(bit, dim, n));
// CHANGE!
VARBITS(result)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)]
= tempvalue;
ereport(NOTICE, (errmsg("[%d,%d]=%d:%x", MC_SEGMENT(bit, dim, n), MC_SEGMENTBIT(bit,dim,n), (((toencode[dim] & (1 << bit)) >> bit)), (int)tempvalue)) );
}
/*** END OF TESTED CODE. ***/
PG_RETURN_VARBIT_P(result);
}
最简单的解决方案(尽管不一定是最有效的)是对我们最初将位写入的超大位串执行子串操作。这实际上会 "shift" 为您自动将位转换为新的位串,这样您就不会将非零值写入截断的位中。
不幸的是,Postgres 的位串底层子串函数被声明为 static
(参见 bitsubstring
中的函数 https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/varbit.c)。快速修复是将整个函数复制到您的代码中。工作解决方案如下:
#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
// Copy bitstring substring function here.
#define MC_SEGMENT(bit, dim, ndim) ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim) ((((bit)*(ndim)) + dim) % CHAR_BIT)
#define ROUNDUP_MULT_IS_POW_2(toRound, mult) ((toRound) + (mult) - 1) & ~((mult) - 1)
Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
ArrayType *input_vector;
int32 *toencode;
int i, n, rlen, len;
int16 num_bits_per;
VarBit *temp, *result;
int mask;
int ipad;
char tempvalue;
int bit;
int dim;
int num_bits;
int num_bytes;
/* Ensure first argument is not null. */
if (PG_ARGISNULL(0))
{
ereport(ERROR, (errmsg("First operand must be non-null")));
}
/* Ensure second argument is not null. */
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("Second operand must be non-null")));
}
/* Get input array and its length (first argument). */
input_vector = PG_GETARG_ARRAYTYPE_P(0);
n = (ARR_DIMS(input_vector))[0];
toencode = (int32 *)ARR_DATA_PTR(input_vector);
/* Get number of bits per dimensions (second argument). */
num_bits_per = PG_GETARG_INT16(1);
/* Allocated the VarBit. */
len = n*num_bits_per;
len = ROUNDUP_MULT_IS_POW_2(len, CHAR_BIT);
rlen = VARBITTOTALLEN(len);
result = palloc0(rlen);
SET_VARSIZE(temp, rlen);
VARBITLEN(temp) = len;
/* Perform the morton encoding. */
num_bits = num_bits_per*n;
num_bytes = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);
for (bit = 0; bit < num_bits_per; ++bit)
for (dim = 0; dim < n; ++dim)
{
tempvalue
= VARBITS(temp)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)];
tempvalue
|= (char)
(((toencode[dim] & (1 << bit)) >> bit)
<< MC_SEGMENTBIT(bit, dim, n));
VARBITS(temp)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)]
= tempvalue;
}
if (len == num_bits)
{
result = temp;
}
else
{
result = bitsubstring(temp, len - num_bits+1, num_bits, false);
pfree(temp);
}
PG_RETURN_VARBIT_P(result);
}
底层 varbit
表示首先填充高位,因此例如 1111
将存储为字节 11110000
。您的实现从填充最后一个字节中的低位开始(因为在第一次迭代中您的移位距离全为零),并且这些位超出任何非 8 的倍数的范围。
按顺序处理输出位可能更简单,至少在概念上如此,尽管偏移量计算变得不那么直接。下面的实现似乎有效。请注意,我正在向后遍历输入数组;我看到的所有 sources 从最后一个坐标绘制第一位,但我不确定这是否是您原始代码中的意图。
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
ArrayType *input_vector;
int32 *toencode;
int16 num_bits_per;
VarBit *result;
int n, rlen, num_bits;
int bit, dim;
int in_bitpos, out_bitnum, out_bytenum, out_bitpos;
bits8 in_bitval;
/* Get input array and its length (first argument). */
input_vector = PG_GETARG_ARRAYTYPE_P(0);
n = (ARR_DIMS(input_vector))[0];
toencode = (int32 *)ARR_DATA_PTR(input_vector);
/* Get number of bits per dimensions (second argument). */
num_bits_per = PG_GETARG_INT16(1);
/* Allocated the VarBit. */
num_bits = n * num_bits_per;
rlen = VARBITTOTALLEN(num_bits);
result = palloc0(rlen);
SET_VARSIZE(result, rlen);
VARBITLEN(result) = num_bits;
/* Perform the morton encoding. */
for (bit = 0; bit < num_bits_per; ++bit)
{
in_bitpos = num_bits_per - 1 - bit;
for (dim = 0; dim < n; ++dim)
{
in_bitval = (toencode[n - dim - 1] & (1 << in_bitpos)) >> in_bitpos;
out_bitnum = bit * n + dim;
out_bytenum = out_bitnum / CHAR_BIT;
out_bitpos = CHAR_BIT - 1 - (out_bitnum % CHAR_BIT);
VARBITS(result)[out_bytenum] |= in_bitval << out_bitpos;
}
}
PG_RETURN_VARBIT_P(result);
}
这是一个简单的测试用例,它在 wiki page 上重现了 table。
SELECT
pg_morton_encode_integer(ARRAY[y], 3::INT2) AS y,
pg_morton_encode_integer(ARRAY[0,y], 3::INT2) AS x_000,
pg_morton_encode_integer(ARRAY[1,y], 3::INT2) AS x_001,
pg_morton_encode_integer(ARRAY[2,y], 3::INT2) AS x_010,
pg_morton_encode_integer(ARRAY[3,y], 3::INT2) AS x_011,
pg_morton_encode_integer(ARRAY[4,y], 3::INT2) AS x_100,
pg_morton_encode_integer(ARRAY[5,y], 3::INT2) AS x_101,
pg_morton_encode_integer(ARRAY[6,y], 3::INT2) AS x_110,
pg_morton_encode_integer(ARRAY[7,y], 3::INT2) AS x_111
FROM generate_series(0,7) s(y)
我正在尝试为 Postgres 编写一个 C 语言函数,它接受一个整数数组并将整数转换为表示数组中整数的莫顿编码(z 曲线编码)的位串。传入两个参数:第一个是实际数组,第二个是每个整数要使用的位数,从最低有效位开始(最大可能是 31,因为我们必须使用无符号整数)。我正在使用我在 Postgres 之外测试过的实现(请参阅代码中的注释)并且我知道它可以工作。
代码(没有对整数、大小界限等进行一些明显必要的检查):
#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
#define MC_SEGMENT(bit, dim, ndim) ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim) ((((bit)*(ndim)) + dim) % CHAR_BIT)
Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
ArrayType *input_vector;
int32 *toencode;
int i, n, rlen, len;
int16 num_bits_per;
VarBit *result;
int mask;
int ipad;
char tempvalue;
int bit;
int dim;
int num_bits;
int num_bytes;
/* Ensure first argument is not null. */
if (PG_ARGISNULL(0))
{
ereport(ERROR, (errmsg("First operand must be non-null")));
}
/* Ensure second argument is not null. */
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("Second operand must be non-null")));
}
/* Get input array and its length (first argument). */
input_vector = PG_GETARG_ARRAYTYPE_P(0);
n = (ARR_DIMS(input_vector))[0];
ereport(NOTICE, (errmsg("n=%d", n) ));
toencode = (int32 *)ARR_DATA_PTR(input_vector);
/* Get number of bits per dimensions (second argument). */
num_bits_per = PG_GETARG_INT16(1);
/* Allocated the VarBit. */
len = n*num_bits_per;
rlen = VARBITTOTALLEN(len);
result = palloc0(rlen);
SET_VARSIZE(result, rlen);
VARBITLEN(result)
= len;
/* Perform the morton encoding. */
num_bits = num_bits_per*n;
num_bytes = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);
/*** TESTED THIS CODE, IT WORKS, BUT NOT IN POSTGRES. :( ***/
for (bit = 0; bit < num_bits_per; ++bit)
for (dim = 0; dim < n; ++dim)
{
tempvalue
= VARBITS(result)[MC_SEGMENT(bit, dim, n)];
tempvalue
|= (char)
(((toencode[dim] & (1 << bit)) >> bit)
<< MC_SEGMENTBIT(bit, dim, n));
VARBITS(result)[MC_SEGMENT(bit, dim, n)]
= tempvalue;
ereport(NOTICE, (errmsg("[%d,%d]=%d:%x", MC_SEGMENT(bit, dim, n), MC_SEGMENTBIT(bit,dim,n), (((toencode[dim] & (1 << bit)) >> bit)), (int)tempvalue)) );
}
/*** END OF TESTED CODE. ***/
PG_RETURN_VARBIT_P(result);
}
生成文件:
MODULE_big = pgmorton
OBJS = pgmorton.o
PGXS := $(shell pg_config --pgxs)
include $(PGXS)
Make 说明(在代码和 Makefile 所在的目录中):
make install
安装后,运行 来自 postgres 的 shell 使功能可用:
DROP FUNCTION IF EXISTS pg_morton_encode_integer(integer[], smallint);
CREATE FUNCTION pg_morton_encode_integer(integer[], smallint) RETURNS varbit
AS 'pgmorton', 'pg_morton_encode_integer'
LANGUAGE C STRICT;
一些产生明显错误输入的例子:
SELECT pg_morton_encode_integer('{2147483647, 2147483647, 2147483647}'::integer[], 31::smallint); -- gives '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111100011' instead of '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111'
SELECT pg_morton_encode_integer('{2147483647, 2147483647}'::integer[], 2::smallint); -- gives '0000' instead of '1111'
我在 OS X Yosemite 上使用 Postgres.app 版本 9.4.2.0 和默认的 clang 编译器完成所有这些工作。
编辑:正如在下面对我的问题的评论中所指出的,字节需要以不同的顺序写入,因为 Postgres 通过使最高有效字节在前来规范化位串字节顺序。我还另外发现,长度为 n
位的位串,其中 n
不是 8 的倍数(CHAR_BIT,一个字节中的位数)从最低有效字节。例如,如果我的位串是 14 位,那么需要 2 个字节(16 位)来存储位串,字节 0 是最重要的,字节 1 是最不重要的。我们从最低有效字节(即字节 1)中丢失了位 0 和 1(两个最低有效位)。(下图使用 B 表示使用的位,X 表示 t运行 位,以使上述内容清楚)。
byte 0 byte 1
=================================
7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0
B B B B B B B B | B B B B B B X X
还应该注意的是,t运行cated 位只能写入零,否则有 Postgres 崩溃的风险。 :)
这是修复第一个问题的更新代码(第二个关于位 t运行cation 我仍然没有解决),注意下面代码注释中更改的行:
#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
#define MC_SEGMENT(bit, dim, ndim) ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim) ((((bit)*(ndim)) + dim) % CHAR_BIT)
Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
ArrayType *input_vector;
int32 *toencode;
int i, n, rlen, len;
int16 num_bits_per;
VarBit *result;
int mask;
int ipad;
char tempvalue;
int bit;
int dim;
int num_bits;
int num_bytes;
/* Ensure first argument is not null. */
if (PG_ARGISNULL(0))
{
ereport(ERROR, (errmsg("First operand must be non-null")));
}
/* Ensure second argument is not null. */
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("Second operand must be non-null")));
}
/* Get input array and its length (first argument). */
input_vector = PG_GETARG_ARRAYTYPE_P(0);
n = (ARR_DIMS(input_vector))[0];
ereport(NOTICE, (errmsg("n=%d", n) ));
toencode = (int32 *)ARR_DATA_PTR(input_vector);
/* Get number of bits per dimensions (second argument). */
num_bits_per = PG_GETARG_INT16(1);
/* Allocated the VarBit. */
len = n*num_bits_per;
rlen = VARBITTOTALLEN(len);
result = palloc0(rlen);
SET_VARSIZE(result, rlen);
VARBITLEN(result)
= len;
/* Perform the morton encoding. */
num_bits = num_bits_per*n;
num_bytes = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);
/*** TESTED THIS CODE, IT WORKS, BUT NOT IN POSTGRES. :( ***/
for (bit = 0; bit < num_bits_per; ++bit)
for (dim = 0; dim < n; ++dim)
{
// CHANGE!
tempvalue
= VARBITS(result)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)];
tempvalue
|= (char)
(((toencode[dim] & (1 << bit)) >> bit)
<< MC_SEGMENTBIT(bit, dim, n));
// CHANGE!
VARBITS(result)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)]
= tempvalue;
ereport(NOTICE, (errmsg("[%d,%d]=%d:%x", MC_SEGMENT(bit, dim, n), MC_SEGMENTBIT(bit,dim,n), (((toencode[dim] & (1 << bit)) >> bit)), (int)tempvalue)) );
}
/*** END OF TESTED CODE. ***/
PG_RETURN_VARBIT_P(result);
}
最简单的解决方案(尽管不一定是最有效的)是对我们最初将位写入的超大位串执行子串操作。这实际上会 "shift" 为您自动将位转换为新的位串,这样您就不会将非零值写入截断的位中。
不幸的是,Postgres 的位串底层子串函数被声明为 static
(参见 bitsubstring
中的函数 https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/varbit.c)。快速修复是将整个函数复制到您的代码中。工作解决方案如下:
#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
// Copy bitstring substring function here.
#define MC_SEGMENT(bit, dim, ndim) ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim) ((((bit)*(ndim)) + dim) % CHAR_BIT)
#define ROUNDUP_MULT_IS_POW_2(toRound, mult) ((toRound) + (mult) - 1) & ~((mult) - 1)
Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
ArrayType *input_vector;
int32 *toencode;
int i, n, rlen, len;
int16 num_bits_per;
VarBit *temp, *result;
int mask;
int ipad;
char tempvalue;
int bit;
int dim;
int num_bits;
int num_bytes;
/* Ensure first argument is not null. */
if (PG_ARGISNULL(0))
{
ereport(ERROR, (errmsg("First operand must be non-null")));
}
/* Ensure second argument is not null. */
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("Second operand must be non-null")));
}
/* Get input array and its length (first argument). */
input_vector = PG_GETARG_ARRAYTYPE_P(0);
n = (ARR_DIMS(input_vector))[0];
toencode = (int32 *)ARR_DATA_PTR(input_vector);
/* Get number of bits per dimensions (second argument). */
num_bits_per = PG_GETARG_INT16(1);
/* Allocated the VarBit. */
len = n*num_bits_per;
len = ROUNDUP_MULT_IS_POW_2(len, CHAR_BIT);
rlen = VARBITTOTALLEN(len);
result = palloc0(rlen);
SET_VARSIZE(temp, rlen);
VARBITLEN(temp) = len;
/* Perform the morton encoding. */
num_bits = num_bits_per*n;
num_bytes = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);
for (bit = 0; bit < num_bits_per; ++bit)
for (dim = 0; dim < n; ++dim)
{
tempvalue
= VARBITS(temp)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)];
tempvalue
|= (char)
(((toencode[dim] & (1 << bit)) >> bit)
<< MC_SEGMENTBIT(bit, dim, n));
VARBITS(temp)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)]
= tempvalue;
}
if (len == num_bits)
{
result = temp;
}
else
{
result = bitsubstring(temp, len - num_bits+1, num_bits, false);
pfree(temp);
}
PG_RETURN_VARBIT_P(result);
}
底层 varbit
表示首先填充高位,因此例如 1111
将存储为字节 11110000
。您的实现从填充最后一个字节中的低位开始(因为在第一次迭代中您的移位距离全为零),并且这些位超出任何非 8 的倍数的范围。
按顺序处理输出位可能更简单,至少在概念上如此,尽管偏移量计算变得不那么直接。下面的实现似乎有效。请注意,我正在向后遍历输入数组;我看到的所有 sources 从最后一个坐标绘制第一位,但我不确定这是否是您原始代码中的意图。
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
ArrayType *input_vector;
int32 *toencode;
int16 num_bits_per;
VarBit *result;
int n, rlen, num_bits;
int bit, dim;
int in_bitpos, out_bitnum, out_bytenum, out_bitpos;
bits8 in_bitval;
/* Get input array and its length (first argument). */
input_vector = PG_GETARG_ARRAYTYPE_P(0);
n = (ARR_DIMS(input_vector))[0];
toencode = (int32 *)ARR_DATA_PTR(input_vector);
/* Get number of bits per dimensions (second argument). */
num_bits_per = PG_GETARG_INT16(1);
/* Allocated the VarBit. */
num_bits = n * num_bits_per;
rlen = VARBITTOTALLEN(num_bits);
result = palloc0(rlen);
SET_VARSIZE(result, rlen);
VARBITLEN(result) = num_bits;
/* Perform the morton encoding. */
for (bit = 0; bit < num_bits_per; ++bit)
{
in_bitpos = num_bits_per - 1 - bit;
for (dim = 0; dim < n; ++dim)
{
in_bitval = (toencode[n - dim - 1] & (1 << in_bitpos)) >> in_bitpos;
out_bitnum = bit * n + dim;
out_bytenum = out_bitnum / CHAR_BIT;
out_bitpos = CHAR_BIT - 1 - (out_bitnum % CHAR_BIT);
VARBITS(result)[out_bytenum] |= in_bitval << out_bitpos;
}
}
PG_RETURN_VARBIT_P(result);
}
这是一个简单的测试用例,它在 wiki page 上重现了 table。
SELECT
pg_morton_encode_integer(ARRAY[y], 3::INT2) AS y,
pg_morton_encode_integer(ARRAY[0,y], 3::INT2) AS x_000,
pg_morton_encode_integer(ARRAY[1,y], 3::INT2) AS x_001,
pg_morton_encode_integer(ARRAY[2,y], 3::INT2) AS x_010,
pg_morton_encode_integer(ARRAY[3,y], 3::INT2) AS x_011,
pg_morton_encode_integer(ARRAY[4,y], 3::INT2) AS x_100,
pg_morton_encode_integer(ARRAY[5,y], 3::INT2) AS x_101,
pg_morton_encode_integer(ARRAY[6,y], 3::INT2) AS x_110,
pg_morton_encode_integer(ARRAY[7,y], 3::INT2) AS x_111
FROM generate_series(0,7) s(y)