在 Apache Beam 中测试架构更新

Testing Schema Update in Apache Beam

我正在制作一个无限数据流管道,并希望确保新版本的架构与旧版本兼容,以便它可以不间断地更新。如果我的原始对象定义为:

@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV1 {
  public final String bank;
  public final double purchaseAmount;

  @SchemaCreate
  public TransactionPojoV1(String bank, double purchaseAmount) {
    this.bank = bank;
    this.purchaseAmount = purchaseAmount;
  }
}

我想添加一个新字段fee:

@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV2 {
  public final String bank;
  public final double purchaseAmount;
  public final double fee;

  @SchemaCreate
  public TransactionPojoV2(String bank, double purchaseAmount, double fee) {
    this.bank = bank;
    this.purchaseAmount = purchaseAmount;
    this.fee = fee
  }
}

我如何编写测试来测试 TransactionPojoV2 是否可以从 TransactionPojoV1 解码?并确保行为符合预期。

以上可能无法通过此测试,不确定,但我想要一些类似的东西:

TransactionPojoV1 transactionPojoV1 = ...

byte[] encoded = Coder.encode(transactionPojoV1);

TransactionPojoV2 transactionPojoV2 = Coder.decode(encoded);

// Assert values are as expected.

我只是不知道该怎么做。

您可以从 PipelineSchemaRegistry 中获取 Coder。对于问题中给出的类,你可以这样做:

@RunWith(JUnit4.class)
public class TransactionPojoTest {
    @Rule
    public TestPipeline p = TestPipeline.create();

    @Test
    public void testTransactionV1toV2() throws NoSuchSchemaException, IOException {
        SchemaRegistry schemaRegistry = p.getSchemaRegistry();

        Coder<TransactionPojoV1> transactionPojoV1Coder = schemaRegistry.getSchemaCoder(TransactionPojoV1.class);
        Coder<TransactionPojoV2> transactionPojoV2Coder = schemaRegistry.getSchemaCoder(TransactionPojoV2.class);

        TransactionPojoV1 inputTransaction = new TransactionPojoV1("Some Bank", 100.0);

        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        transactionPojoV1Coder.encode(inputTransaction, byteArrayOutputStream);

        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());

        TransactionPojoV2 outputTransaction = transactionPojoV2Coder.decode(byteArrayInputStream);

        assertEquals(inputTransaction.bank, outputTransaction.bank);
        assertEquals(inputTransaction.purchaseAmount, outputTransaction.purchaseAmount, 1e-15);
        assertEquals(0D, outputTransaction.fee.doubleValue(), 1e-15);
    }
}

为了通过,TransactionPojoV2 需要:

@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV2 {
    public final String bank;
    public final double purchaseAmount;
    @Nullable public final Double fee;

    @SchemaCreate
    public TransactionPojoV2(String bank, double purchaseAmount, @Nullable Double fee) {
        this.bank = bank;
        this.purchaseAmount = purchaseAmount;
        this.fee = fee != null ? fee : 0;
    }
}

数据流运行器在这里可以更加宽松,因为它支持重新排序字段以确保更新兼容性。如果有新字段,它将把它们移到架构的“末尾”,并确保现有字段在新旧架构中的顺序相同。

作为 Beam 用户,这一切对您来说都是透明的,这只是意味着只要您的新架构相对于旧架构仅 添加 字段,就应该更新-兼容。

要测试这种更新兼容性,我建议您使用这样的东西:

Schema v1 = schemaRegistry.getSchema(TransactionPojoV1.class);
Schema v2 = schemaRegistry.getSchema(TransactionPojoV2.class);

Set<Field> removedFields = Sets.difference(ImmutableSet.of(v1.getFields()), 
                                           ImmutableSet.of(v2.getFields()));
assertEmpty(removedFields);

注意这只是即兴发挥,我还没有测试过。它还依赖于 Guava 中的 Sets 和 ImmutableSet。

此外,值得注意的是 Dataflow 在实践中可能比这更宽松。 Dataflow 只关心融合边界处的更新兼容性(即有 GroupByKey/shuffle 的地方)。