在 Apache Beam 中测试架构更新
Testing Schema Update in Apache Beam
我正在制作一个无限数据流管道,并希望确保新版本的架构与旧版本兼容,以便它可以不间断地更新。如果我的原始对象定义为:
@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV1 {
public final String bank;
public final double purchaseAmount;
@SchemaCreate
public TransactionPojoV1(String bank, double purchaseAmount) {
this.bank = bank;
this.purchaseAmount = purchaseAmount;
}
}
我想添加一个新字段fee
:
@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV2 {
public final String bank;
public final double purchaseAmount;
public final double fee;
@SchemaCreate
public TransactionPojoV2(String bank, double purchaseAmount, double fee) {
this.bank = bank;
this.purchaseAmount = purchaseAmount;
this.fee = fee
}
}
我如何编写测试来测试 TransactionPojoV2
是否可以从 TransactionPojoV1
解码?并确保行为符合预期。
以上可能无法通过此测试,不确定,但我想要一些类似的东西:
TransactionPojoV1 transactionPojoV1 = ...
byte[] encoded = Coder.encode(transactionPojoV1);
TransactionPojoV2 transactionPojoV2 = Coder.decode(encoded);
// Assert values are as expected.
我只是不知道该怎么做。
您可以从 Pipeline
的 SchemaRegistry
中获取 Coder
。对于问题中给出的类,你可以这样做:
@RunWith(JUnit4.class)
public class TransactionPojoTest {
@Rule
public TestPipeline p = TestPipeline.create();
@Test
public void testTransactionV1toV2() throws NoSuchSchemaException, IOException {
SchemaRegistry schemaRegistry = p.getSchemaRegistry();
Coder<TransactionPojoV1> transactionPojoV1Coder = schemaRegistry.getSchemaCoder(TransactionPojoV1.class);
Coder<TransactionPojoV2> transactionPojoV2Coder = schemaRegistry.getSchemaCoder(TransactionPojoV2.class);
TransactionPojoV1 inputTransaction = new TransactionPojoV1("Some Bank", 100.0);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
transactionPojoV1Coder.encode(inputTransaction, byteArrayOutputStream);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
TransactionPojoV2 outputTransaction = transactionPojoV2Coder.decode(byteArrayInputStream);
assertEquals(inputTransaction.bank, outputTransaction.bank);
assertEquals(inputTransaction.purchaseAmount, outputTransaction.purchaseAmount, 1e-15);
assertEquals(0D, outputTransaction.fee.doubleValue(), 1e-15);
}
}
为了通过,TransactionPojoV2
需要:
@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV2 {
public final String bank;
public final double purchaseAmount;
@Nullable public final Double fee;
@SchemaCreate
public TransactionPojoV2(String bank, double purchaseAmount, @Nullable Double fee) {
this.bank = bank;
this.purchaseAmount = purchaseAmount;
this.fee = fee != null ? fee : 0;
}
}
数据流运行器在这里可以更加宽松,因为它支持重新排序字段以确保更新兼容性。如果有新字段,它将把它们移到架构的“末尾”,并确保现有字段在新旧架构中的顺序相同。
作为 Beam 用户,这一切对您来说都是透明的,这只是意味着只要您的新架构相对于旧架构仅 添加 字段,就应该更新-兼容。
要测试这种更新兼容性,我建议您使用这样的东西:
Schema v1 = schemaRegistry.getSchema(TransactionPojoV1.class);
Schema v2 = schemaRegistry.getSchema(TransactionPojoV2.class);
Set<Field> removedFields = Sets.difference(ImmutableSet.of(v1.getFields()),
ImmutableSet.of(v2.getFields()));
assertEmpty(removedFields);
注意这只是即兴发挥,我还没有测试过。它还依赖于 Guava 中的 Sets 和 ImmutableSet。
此外,值得注意的是 Dataflow 在实践中可能比这更宽松。 Dataflow 只关心融合边界处的更新兼容性(即有 GroupByKey/shuffle 的地方)。
我正在制作一个无限数据流管道,并希望确保新版本的架构与旧版本兼容,以便它可以不间断地更新。如果我的原始对象定义为:
@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV1 {
public final String bank;
public final double purchaseAmount;
@SchemaCreate
public TransactionPojoV1(String bank, double purchaseAmount) {
this.bank = bank;
this.purchaseAmount = purchaseAmount;
}
}
我想添加一个新字段fee
:
@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV2 {
public final String bank;
public final double purchaseAmount;
public final double fee;
@SchemaCreate
public TransactionPojoV2(String bank, double purchaseAmount, double fee) {
this.bank = bank;
this.purchaseAmount = purchaseAmount;
this.fee = fee
}
}
我如何编写测试来测试 TransactionPojoV2
是否可以从 TransactionPojoV1
解码?并确保行为符合预期。
以上可能无法通过此测试,不确定,但我想要一些类似的东西:
TransactionPojoV1 transactionPojoV1 = ...
byte[] encoded = Coder.encode(transactionPojoV1);
TransactionPojoV2 transactionPojoV2 = Coder.decode(encoded);
// Assert values are as expected.
我只是不知道该怎么做。
您可以从 Pipeline
的 SchemaRegistry
中获取 Coder
。对于问题中给出的类,你可以这样做:
@RunWith(JUnit4.class)
public class TransactionPojoTest {
@Rule
public TestPipeline p = TestPipeline.create();
@Test
public void testTransactionV1toV2() throws NoSuchSchemaException, IOException {
SchemaRegistry schemaRegistry = p.getSchemaRegistry();
Coder<TransactionPojoV1> transactionPojoV1Coder = schemaRegistry.getSchemaCoder(TransactionPojoV1.class);
Coder<TransactionPojoV2> transactionPojoV2Coder = schemaRegistry.getSchemaCoder(TransactionPojoV2.class);
TransactionPojoV1 inputTransaction = new TransactionPojoV1("Some Bank", 100.0);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
transactionPojoV1Coder.encode(inputTransaction, byteArrayOutputStream);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
TransactionPojoV2 outputTransaction = transactionPojoV2Coder.decode(byteArrayInputStream);
assertEquals(inputTransaction.bank, outputTransaction.bank);
assertEquals(inputTransaction.purchaseAmount, outputTransaction.purchaseAmount, 1e-15);
assertEquals(0D, outputTransaction.fee.doubleValue(), 1e-15);
}
}
为了通过,TransactionPojoV2
需要:
@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojoV2 {
public final String bank;
public final double purchaseAmount;
@Nullable public final Double fee;
@SchemaCreate
public TransactionPojoV2(String bank, double purchaseAmount, @Nullable Double fee) {
this.bank = bank;
this.purchaseAmount = purchaseAmount;
this.fee = fee != null ? fee : 0;
}
}
数据流运行器在这里可以更加宽松,因为它支持重新排序字段以确保更新兼容性。如果有新字段,它将把它们移到架构的“末尾”,并确保现有字段在新旧架构中的顺序相同。
作为 Beam 用户,这一切对您来说都是透明的,这只是意味着只要您的新架构相对于旧架构仅 添加 字段,就应该更新-兼容。
要测试这种更新兼容性,我建议您使用这样的东西:
Schema v1 = schemaRegistry.getSchema(TransactionPojoV1.class);
Schema v2 = schemaRegistry.getSchema(TransactionPojoV2.class);
Set<Field> removedFields = Sets.difference(ImmutableSet.of(v1.getFields()),
ImmutableSet.of(v2.getFields()));
assertEmpty(removedFields);
注意这只是即兴发挥,我还没有测试过。它还依赖于 Guava 中的 Sets 和 ImmutableSet。
此外,值得注意的是 Dataflow 在实践中可能比这更宽松。 Dataflow 只关心融合边界处的更新兼容性(即有 GroupByKey/shuffle 的地方)。