Beam PAssert 弄乱了行

Beam PAssert messes up the Row

我正在探索使用 Beam 进行测试并遇到了一个奇怪的问题。

我的驱动程序按预期工作,但它的测试失败并出现如下错误:

Expected: iterable with items [<Row: 
project_id:count
count_in:2
count_out:0
type:null
window_max_ts:86399999
>] in any order
     but: not matched: <Row: 
project_id:p1
count_in:2
count_out:0
type:count
window_max_ts:86399999
>

这是我的 PAssert 代码:

PAssert
            .that(output)
            .inWindow(window)
            .containsInAnyOrder(
                Row
                    .withSchema(OUTPUT_SCHEMA)
                    .withFieldValue("type", "count")
                    .withFieldValue("count_in", 2L)
                    .withFieldValue("count_out", 0L)
                    .withFieldValue(AddWindowTimestamp.TIMESTAMP_FIELD, window.maxTimestamp().getMillis())
                    .build()
            );

在管道的最后一步,我记录了相关元素。

[direct-runner-worker] DEBUG co.botanalytics.data.processing.beam.transforms.Log - Window: [maxTimestamp=1970-01-01T23:59:59.999Z], Pane: [PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}], Element: Row: 
project_id:p1
count_in:2
count_out:0
type:count
window_max_ts:86399999

这是预期的结果。

当我调试测试时,问题归结为来自 Beam Java SDK 的 CoderUtils

经过CoderUtils编解码后,产生了完全不同的预期Row。它的所有字段都乱七八糟,因此 PAssert 失败。

我想知道是否有解决这个问题的办法。任何建议都非常受欢迎。

提前致谢!

OUTPUT_SCHEMA定义:

private static final transient Schema SCHEMA = Schema
            .builder()
            .addStringField("project_id")
            .addNullableField("type", Schema.FieldType.STRING)
            .addInt64Field("count_in")
            .addInt64Field("count_out")
            .build();

代码可以正常运行,测试失败。我相信这是由于 PAssert 定义错误造成的。

  • 在测试行定义中添加项目标签.withFieldValue("project_id", "p1"),可能会解决交叉参数的问题

  • 对于错误 Expected: iterable with items [<Row: ... >] in any order but: not matched:,请提供 output 变量作为 Array of Rows,而不是仅提供一个 Row。它期待一个 array 但只收到一个 Row.

您的最终代码将如下所示:

// just an example to convert to array, choose any suitable way for you
    Foo[] array = new Foo[output.size()];
    output.toArray(array);

        PAssert
                    .that(output)
                    .inWindow(window)
                    .containsInAnyOrder(
                        Row
                            .withSchema(OUTPUT_SCHEMA)
                            .withFieldValue("project_id", "p1")
                            .withFieldValue("type", "count")
                            .withFieldValue("count_in", 2L)
                            .withFieldValue("count_out", 0L)
                            .withFieldValue(AddWindowTimestamp.TIMESTAMP_FIELD, window.maxTimestamp().getMillis())
                            .build()
                    );