为什么我的 Postgres VarBit(C 语言函数)中的某些位针对特定输入被置零?

Why are some of the bits in my Postgres VarBit (C-language function) being zeroed for specific input?

我正在尝试为 Postgres 编写一个 C 语言函数,它接受一个整数数组并将整数转换为表示数组中整数的莫顿编码(z 曲线编码)的位串。传入两个参数:第一个是实际数组,第二个是每个整数要使用的位数,从最低有效位开始(最大可能是 31,因为我们必须使用无符号整数)。我正在使用我在 Postgres 之外测试过的实现(请参阅代码中的注释)并且我知道它可以工作。

代码(没有对整数、大小界限等进行一些明显必要的检查):

#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

#define MC_SEGMENT(bit, dim, ndim)  ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim)   ((((bit)*(ndim)) + dim) % CHAR_BIT)

Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
    ArrayType   *input_vector;
    int32       *toencode;
    int     i, n, rlen, len;
    int16       num_bits_per;
    VarBit      *result;
    int     mask;
    int     ipad;
    char        tempvalue;

    int     bit;
    int     dim;
    int     num_bits;
    int     num_bytes;

    /* Ensure first argument is not null. */
    if (PG_ARGISNULL(0))
    {
        ereport(ERROR, (errmsg("First operand must be non-null")));
    }
    /* Ensure second argument is not null. */
    if (PG_ARGISNULL(1))
    {
        ereport(ERROR, (errmsg("Second operand must be non-null")));
    }

    /* Get input array and its length (first argument). */
    input_vector    = PG_GETARG_ARRAYTYPE_P(0);
    n       = (ARR_DIMS(input_vector))[0];
ereport(NOTICE, (errmsg("n=%d", n) ));
    toencode    = (int32 *)ARR_DATA_PTR(input_vector);

    /* Get number of bits per dimensions (second argument). */
    num_bits_per    = PG_GETARG_INT16(1);

    /* Allocated the VarBit. */
    len     = n*num_bits_per;
    rlen        = VARBITTOTALLEN(len);
    result      = palloc0(rlen);
    SET_VARSIZE(result, rlen);
    VARBITLEN(result)
                = len;

    /* Perform the morton encoding. */
    num_bits        = num_bits_per*n;
    num_bytes       = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);

    /*** TESTED THIS CODE, IT WORKS, BUT NOT IN POSTGRES. :( ***/
    for (bit = 0; bit < num_bits_per; ++bit)
    for (dim = 0; dim < n; ++dim)
    {
        tempvalue
            = VARBITS(result)[MC_SEGMENT(bit, dim, n)];
        tempvalue
            |= (char)
             (((toencode[dim] & (1 << bit)) >> bit)
              << MC_SEGMENTBIT(bit, dim, n));
        VARBITS(result)[MC_SEGMENT(bit, dim, n)]
            = tempvalue;
ereport(NOTICE, (errmsg("[%d,%d]=%d:%x", MC_SEGMENT(bit, dim, n), MC_SEGMENTBIT(bit,dim,n), (((toencode[dim] & (1 << bit)) >> bit)), (int)tempvalue)) );
    }
    /*** END OF TESTED CODE. ***/
    PG_RETURN_VARBIT_P(result);
}

生成文件:

MODULE_big = pgmorton
OBJS = pgmorton.o
PGXS := $(shell pg_config --pgxs)
include $(PGXS)

Make 说明(在代码和 Makefile 所在的目录中):

make install

安装后,运行 来自 postgres 的 shell 使功能可用:

DROP FUNCTION IF EXISTS pg_morton_encode_integer(integer[], smallint);
CREATE FUNCTION pg_morton_encode_integer(integer[], smallint) RETURNS varbit
  AS 'pgmorton', 'pg_morton_encode_integer'
  LANGUAGE C STRICT;

一些产生明显错误输入的例子:

SELECT pg_morton_encode_integer('{2147483647, 2147483647, 2147483647}'::integer[], 31::smallint); -- gives '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111100011' instead of '111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111'
SELECT pg_morton_encode_integer('{2147483647, 2147483647}'::integer[], 2::smallint); -- gives '0000' instead of '1111'

我在 OS X Yosemite 上使用 Postgres.app 版本 9.4.2.0 和默认的 clang 编译器完成所有这些工作。

编辑:正如在下面对我的问题的评论中所指出的,字节需要以不同的顺序写入,因为 Postgres 通过使最高有效字节在前来规范化位串字节顺序。我还另外发现,长度为 n 位的位串,其中 n 不是 8 的倍数(CHAR_BIT,一个字节中的位数)从最低有效字节。例如,如果我的位串是 14 位,那么需要 2 个字节(16 位)来存储位串,字节 0 是最重要的,字节 1 是最不重要的。我们从最低有效字节(即字节 1)中丢失了位 0 和 1(两个最低有效位)。(下图使用 B 表示使用的位,X 表示 t运行 位,以使上述内容清楚)。

byte 0            byte 1
=================================
7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0
B B B B B B B B | B B B B B B X X

还应该注意的是,t运行cated 位只能写入零,否则有 Postgres 崩溃的风险。 :)

这是修复第一个问题的更新代码(第二个关于位 t运行cation 我仍然没有解决),注意下面代码注释中更改的行:

#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

#define MC_SEGMENT(bit, dim, ndim)  ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim)   ((((bit)*(ndim)) + dim) % CHAR_BIT)

Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
    ArrayType   *input_vector;
    int32       *toencode;
    int     i, n, rlen, len;
    int16       num_bits_per;
    VarBit      *result;
    int     mask;
    int     ipad;
    char        tempvalue;

    int     bit;
    int     dim;
    int     num_bits;
    int     num_bytes;

    /* Ensure first argument is not null. */
    if (PG_ARGISNULL(0))
    {
        ereport(ERROR, (errmsg("First operand must be non-null")));
    }
    /* Ensure second argument is not null. */
    if (PG_ARGISNULL(1))
    {
        ereport(ERROR, (errmsg("Second operand must be non-null")));
    }

    /* Get input array and its length (first argument). */
    input_vector    = PG_GETARG_ARRAYTYPE_P(0);
    n       = (ARR_DIMS(input_vector))[0];
ereport(NOTICE, (errmsg("n=%d", n) ));
    toencode    = (int32 *)ARR_DATA_PTR(input_vector);

    /* Get number of bits per dimensions (second argument). */
    num_bits_per    = PG_GETARG_INT16(1);

    /* Allocated the VarBit. */
    len     = n*num_bits_per;
    rlen        = VARBITTOTALLEN(len);
    result      = palloc0(rlen);
    SET_VARSIZE(result, rlen);
    VARBITLEN(result)
                = len;

    /* Perform the morton encoding. */
    num_bits        = num_bits_per*n;
    num_bytes       = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);

    /*** TESTED THIS CODE, IT WORKS, BUT NOT IN POSTGRES. :( ***/
    for (bit = 0; bit < num_bits_per; ++bit)
    for (dim = 0; dim < n; ++dim)
    {
        // CHANGE!
        tempvalue
            = VARBITS(result)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)];
        tempvalue
            |= (char)
             (((toencode[dim] & (1 << bit)) >> bit)
              << MC_SEGMENTBIT(bit, dim, n));
        // CHANGE!
        VARBITS(result)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)]
            = tempvalue;
ereport(NOTICE, (errmsg("[%d,%d]=%d:%x", MC_SEGMENT(bit, dim, n), MC_SEGMENTBIT(bit,dim,n), (((toencode[dim] & (1 << bit)) >> bit)), (int)tempvalue)) );
    }
    /*** END OF TESTED CODE. ***/
    PG_RETURN_VARBIT_P(result);
}

最简单的解决方案(尽管不一定是最有效的)是对我们最初将位写入的超大位串执行子串操作。这实际上会 "shift" 为您自动将位转换为新的位串,这样您就不会将非零值写入截断的位中。

不幸的是,Postgres 的位串底层子串函数被声明为 static(参见 bitsubstring 中的函数 https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/varbit.c)。快速修复是将整个函数复制到您的代码中。工作解决方案如下:

#include <postgres.h>
#include <utils/array.h>
#include <utils/varbit.h>
#include <fmgr.h>
#include <limits.h>

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

// Copy bitstring substring function here.

#define MC_SEGMENT(bit, dim, ndim)  ((((bit)*(ndim)) + dim) / CHAR_BIT)
#define MC_SEGMENTBIT(bit, dim, ndim)   ((((bit)*(ndim)) + dim) % CHAR_BIT)
#define ROUNDUP_MULT_IS_POW_2(toRound, mult) ((toRound) + (mult) - 1) & ~((mult) - 1)

Datum pg_morton_encode_integer(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(pg_morton_encode_integer);
Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
    ArrayType   *input_vector;
    int32       *toencode;
    int     i, n, rlen, len;
    int16       num_bits_per;
    VarBit  *temp,  *result;
    int     mask;
    int     ipad;
    char        tempvalue;

    int     bit;
    int     dim;
    int     num_bits;
    int     num_bytes;

    /* Ensure first argument is not null. */
    if (PG_ARGISNULL(0))
    {
        ereport(ERROR, (errmsg("First operand must be non-null")));
    }
    /* Ensure second argument is not null. */
    if (PG_ARGISNULL(1))
    {
        ereport(ERROR, (errmsg("Second operand must be non-null")));
    }

    /* Get input array and its length (first argument). */
    input_vector    = PG_GETARG_ARRAYTYPE_P(0);
    n       = (ARR_DIMS(input_vector))[0];
    toencode    = (int32 *)ARR_DATA_PTR(input_vector);

    /* Get number of bits per dimensions (second argument). */
    num_bits_per    = PG_GETARG_INT16(1);

    /* Allocated the VarBit. */
    len     = n*num_bits_per;
    len             = ROUNDUP_MULT_IS_POW_2(len, CHAR_BIT);
    rlen        = VARBITTOTALLEN(len);
    result      = palloc0(rlen);
    SET_VARSIZE(temp, rlen);
    VARBITLEN(temp) = len;

    /* Perform the morton encoding. */
    num_bits        = num_bits_per*n;
    num_bytes   = (num_bits/8) + (num_bits % 8 == 0 ? 0 : 1);

    for (bit = 0; bit < num_bits_per; ++bit)
    for (dim = 0; dim < n; ++dim)
    {
        tempvalue
            = VARBITS(temp)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)];
        tempvalue
            |= (char)
             (((toencode[dim] & (1 << bit)) >> bit)
              << MC_SEGMENTBIT(bit, dim, n));
        VARBITS(temp)[num_bytes - 1 - MC_SEGMENT(bit, dim, n)]
            = tempvalue;
    }

    if (len == num_bits)
    {
        result     = temp;
    }
    else
    {
        result     = bitsubstring(temp, len - num_bits+1, num_bits, false);
        pfree(temp);
    }
    PG_RETURN_VARBIT_P(result);
}

底层 varbit 表示首先填充高位,因此例如 1111 将存储为字节 11110000。您的实现从填充最后一个字节中的低位开始(因为在第一次迭代中您的移位距离全为零),并且这些位超出任何非 8 的倍数的范围。

按顺序处理输出位可能更简单,至少在概念上如此,尽管偏移量计算变得不那么直接。下面的实现似乎有效。请注意,我正在向后遍历输入数组;我看到的所有 sources 从最后一个坐标绘制第一位,但我不确定这是否是您原始代码中的意图。

Datum
pg_morton_encode_integer(PG_FUNCTION_ARGS)
{
    ArrayType   *input_vector;
    int32       *toencode;
    int16       num_bits_per;
    VarBit      *result;
    int         n, rlen, num_bits;
    int         bit, dim;
    int         in_bitpos, out_bitnum, out_bytenum, out_bitpos;
    bits8       in_bitval;

    /* Get input array and its length (first argument). */
    input_vector = PG_GETARG_ARRAYTYPE_P(0);
    n            = (ARR_DIMS(input_vector))[0];
    toencode     = (int32 *)ARR_DATA_PTR(input_vector);

    /* Get number of bits per dimensions (second argument). */
    num_bits_per = PG_GETARG_INT16(1);

    /* Allocated the VarBit. */
    num_bits = n * num_bits_per;
    rlen     = VARBITTOTALLEN(num_bits);
    result   = palloc0(rlen);
    SET_VARSIZE(result, rlen);
    VARBITLEN(result) = num_bits;

    /* Perform the morton encoding. */
    for (bit = 0; bit < num_bits_per; ++bit)
    {
        in_bitpos = num_bits_per - 1 - bit;
        for (dim = 0; dim < n; ++dim)
        {
            in_bitval = (toencode[n - dim - 1] & (1 << in_bitpos)) >> in_bitpos;
            out_bitnum = bit * n + dim;
            out_bytenum = out_bitnum / CHAR_BIT;
            out_bitpos = CHAR_BIT - 1 - (out_bitnum % CHAR_BIT);

            VARBITS(result)[out_bytenum] |= in_bitval << out_bitpos;
        }
    }

    PG_RETURN_VARBIT_P(result);
}

这是一个简单的测试用例,它在 wiki page 上重现了 table。

 SELECT
   pg_morton_encode_integer(ARRAY[y],   3::INT2) AS y,
   pg_morton_encode_integer(ARRAY[0,y], 3::INT2) AS x_000,
   pg_morton_encode_integer(ARRAY[1,y], 3::INT2) AS x_001,
   pg_morton_encode_integer(ARRAY[2,y], 3::INT2) AS x_010,
   pg_morton_encode_integer(ARRAY[3,y], 3::INT2) AS x_011,
   pg_morton_encode_integer(ARRAY[4,y], 3::INT2) AS x_100,
   pg_morton_encode_integer(ARRAY[5,y], 3::INT2) AS x_101,
   pg_morton_encode_integer(ARRAY[6,y], 3::INT2) AS x_110,
   pg_morton_encode_integer(ARRAY[7,y], 3::INT2) AS x_111
 FROM generate_series(0,7) s(y)