如何尽快限制 Apache Beam 中的 PCollection?

How to limit PCollection in Apache Beam as soon as possible?

我在 Google Cloud DataFlow(使用 Scio SDK)上使用 Apache Beam 2.28.0。我有一个很大的输入PCollection(有界),我想将它限制/采样到固定数量的元素,但我想尽快开始下游处理。

目前,当我的输入PCollection有例如20M 个元素,我想使用 https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/transforms/Sample.html#any-long-

将其限制为 1M
input.apply(Sample.<String>any(1000000))

等待20M的元素全部读取完毕,耗时较长

如何有效地将元素数量限制在固定大小并在达到限制后立即开始下游处理,丢弃其余的输入处理?

好的,所以我最初的解决方案是像这样使用 Stateful DoFn(我正在使用问题中提到的 Scio 的 Scala SDK):

import java.lang.{Long => JLong}

class MyLimitFn[T](limit: Long) extends DoFn[KV[String, T], KV[String, T]] {
  @StateId("count") private val count = StateSpecs.value[JLong]()

  @ProcessElement
  def processElement(context: DoFn[KV[String, T], KV[String, T]]#ProcessContext, @StateId("count") count: ValueState[JLong]): Unit = {
    val current = count.read()
    if(current < limit) {
      count.write(current + 1L)
      context.output(context.element())
    }
  }
}

这个解决方案的缺点是我需要在使用它之前向所有元素综合添加相同的键(例如一个空字符串)。到目前为止,它比 Sample.<>any().

快多了

我仍然期待看到更好/更高效的解决方案。