用于 Beam 基准测试的即时数据生成

On-the-fly data generation for benchmarking Beam

我的目标是在具有不同 window 查询的流数据用例上对 Apache Beam 的延迟和吞吐量进行基准测试。

我想使用即时数据生成器创建自己的数据,以手动控制数据生成速率,并直接从没有 pub/sub 机制的管道中使用这些数据,即我不想要从代理等读取数据以避免瓶颈。 有没有办法做一些类似于我想要实现的事情?或者 Beam SDK 是否有此类用例的源代码? 到目前为止我找不到起点,现有的代码示例使用 pub/sub 机制并且他们假设数据来自某个地方。

感谢您提前提出建议。

关于即时数据,一种选择是使用 GenerateSequence,例如:

pipeline.apply(GenerateSequence.from(0).withRate(RATE,Duration.millis(1000)))

要创建其他类型的对象,您可以在消耗 Long 并将其变成其他对象后使用 ParDo:

Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
    p.apply(GenerateSequence.from(0).withRate(2, Duration.millis(1000)))
    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
    .apply(FlatMapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
        .via(i -> IntStream.range(0,2).mapToObj(k -> KV.of(String.format("Gen Value %s" , i),String.format("FlatMap Value %s ", k))).collect(Collectors.toList())))
    .apply(ParDo.of(new DoFn<KV<String,String>, String>() {
      @ProcessElement
      public void process(@Element KV<String,String> input){
        LOG.info("Value was {}", input);
      }
    }));
p.run();

这应该生成如下值:

Value was KV{Gen Value 0, FlatMap Value 0 }
Value was KV{Gen Value 0, FlatMap Value 1 }
Value was KV{Gen Value 1, FlatMap Value 0 }
Value was KV{Gen Value 1, FlatMap Value 1 }
Value was KV{Gen Value 2, FlatMap Value 0 }
Value was KV{Gen Value 2, FlatMap Value 1 }

管道性能测试需要记住的其他一些事项:

  • Direct runner 专为单元测试而设计,它可以做很酷的事情,例如模拟故障,这有助于发现在 运行 生产管道中会出现的问题。但是,它并非旨在帮助进行性能测试。我建议始终对这些类型的集成测试使用主要运行器。

  • 请注意融合优化Link to Docs, when using a artificial data source like GenerateSequence you may need to do a GBK as the next step to allow the work to be parallelized. For the Dataflow runner more info can be found here: Link to Docs

  • 一般来说,对于性能测试,我建议测试整个端到端管道。与源和汇(例如水印)的交互将不会在独立管道中进行测试。

希望对您有所帮助。