用于 Beam 基准测试的即时数据生成
On-the-fly data generation for benchmarking Beam
我的目标是在具有不同 window 查询的流数据用例上对 Apache Beam 的延迟和吞吐量进行基准测试。
我想使用即时数据生成器创建自己的数据,以手动控制数据生成速率,并直接从没有 pub/sub 机制的管道中使用这些数据,即我不想要从代理等读取数据以避免瓶颈。
有没有办法做一些类似于我想要实现的事情?或者 Beam SDK 是否有此类用例的源代码?
到目前为止我找不到起点,现有的代码示例使用 pub/sub 机制并且他们假设数据来自某个地方。
感谢您提前提出建议。
关于即时数据,一种选择是使用 GenerateSequence,例如:
pipeline.apply(GenerateSequence.from(0).withRate(RATE,Duration.millis(1000)))
要创建其他类型的对象,您可以在消耗 Long 并将其变成其他对象后使用 ParDo:
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
p.apply(GenerateSequence.from(0).withRate(2, Duration.millis(1000)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(FlatMapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(i -> IntStream.range(0,2).mapToObj(k -> KV.of(String.format("Gen Value %s" , i),String.format("FlatMap Value %s ", k))).collect(Collectors.toList())))
.apply(ParDo.of(new DoFn<KV<String,String>, String>() {
@ProcessElement
public void process(@Element KV<String,String> input){
LOG.info("Value was {}", input);
}
}));
p.run();
这应该生成如下值:
Value was KV{Gen Value 0, FlatMap Value 0 }
Value was KV{Gen Value 0, FlatMap Value 1 }
Value was KV{Gen Value 1, FlatMap Value 0 }
Value was KV{Gen Value 1, FlatMap Value 1 }
Value was KV{Gen Value 2, FlatMap Value 0 }
Value was KV{Gen Value 2, FlatMap Value 1 }
管道性能测试需要记住的其他一些事项:
Direct runner 专为单元测试而设计,它可以做很酷的事情,例如模拟故障,这有助于发现在 运行 生产管道中会出现的问题。但是,它并非旨在帮助进行性能测试。我建议始终对这些类型的集成测试使用主要运行器。
请注意融合优化Link to Docs, when using a artificial data source like GenerateSequence you may need to do a GBK as the next step to allow the work to be parallelized. For the Dataflow runner more info can be found here: Link to Docs
一般来说,对于性能测试,我建议测试整个端到端管道。与源和汇(例如水印)的交互将不会在独立管道中进行测试。
希望对您有所帮助。
我的目标是在具有不同 window 查询的流数据用例上对 Apache Beam 的延迟和吞吐量进行基准测试。
我想使用即时数据生成器创建自己的数据,以手动控制数据生成速率,并直接从没有 pub/sub 机制的管道中使用这些数据,即我不想要从代理等读取数据以避免瓶颈。 有没有办法做一些类似于我想要实现的事情?或者 Beam SDK 是否有此类用例的源代码? 到目前为止我找不到起点,现有的代码示例使用 pub/sub 机制并且他们假设数据来自某个地方。
感谢您提前提出建议。
关于即时数据,一种选择是使用 GenerateSequence,例如:
pipeline.apply(GenerateSequence.from(0).withRate(RATE,Duration.millis(1000)))
要创建其他类型的对象,您可以在消耗 Long 并将其变成其他对象后使用 ParDo:
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
p.apply(GenerateSequence.from(0).withRate(2, Duration.millis(1000)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(FlatMapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(i -> IntStream.range(0,2).mapToObj(k -> KV.of(String.format("Gen Value %s" , i),String.format("FlatMap Value %s ", k))).collect(Collectors.toList())))
.apply(ParDo.of(new DoFn<KV<String,String>, String>() {
@ProcessElement
public void process(@Element KV<String,String> input){
LOG.info("Value was {}", input);
}
}));
p.run();
这应该生成如下值:
Value was KV{Gen Value 0, FlatMap Value 0 }
Value was KV{Gen Value 0, FlatMap Value 1 }
Value was KV{Gen Value 1, FlatMap Value 0 }
Value was KV{Gen Value 1, FlatMap Value 1 }
Value was KV{Gen Value 2, FlatMap Value 0 }
Value was KV{Gen Value 2, FlatMap Value 1 }
管道性能测试需要记住的其他一些事项:
Direct runner 专为单元测试而设计,它可以做很酷的事情,例如模拟故障,这有助于发现在 运行 生产管道中会出现的问题。但是,它并非旨在帮助进行性能测试。我建议始终对这些类型的集成测试使用主要运行器。
请注意融合优化Link to Docs, when using a artificial data source like GenerateSequence you may need to do a GBK as the next step to allow the work to be parallelized. For the Dataflow runner more info can be found here: Link to Docs
一般来说,对于性能测试,我建议测试整个端到端管道。与源和汇(例如水印)的交互将不会在独立管道中进行测试。
希望对您有所帮助。