什么是用于可拆分 DoFn 读取无界 Iterable 的正确 RestrictionT?

What is a correct RestrictionT to use for Splittable DoFn reading an unbounded Iterable?

我正在编写一个 Splittable DoFn 来读取 MongoDB 更改流。 它允许我观察描述集合更改的事件,并且我可以在我想要的任意集群时间戳处开始读取,前提是 oplog 有足够的历史记录。 集群时间戳是自纪元以来的秒数与给定秒内操作的序列号相结合。

我已经查看了 SDF 的其他示例,但到目前为止我所看到的都是 假设一个“可搜索的”数据源(Kafka 主题分区、Parquet/Avro 文件等)

MongoDB暴露的接口是一个简单的Iterable,所以我真的无法seek精确偏移 (除了在时间戳之后开始获得一个新的 Iterable 之外),并且它产生的事件只有集群时间戳——同样,没有与输出元素关联的精确偏移量。

为了配置 SDF,我使用以下 class 作为我的输入元素类型:

  public static class StreamConfig implements Serializable {
    public final String databaseName;
    public final String collectionName;
    public final Instant startFrom;

  ...
  }

作为限制,我使用 OffsetRange 因为我可以将这些时间戳转换为 Long 值并返回。 对于偏移跟踪器,我选择了 GrowableOffsetRangeTracker,因为这一次可以处理潜在的无限范围。

我在提出范围结束估计器时遇到了问题 - 最后我假设 now() 会是 自从我们最快读取流以来的最大潜在时间戳是实时的。

  @GetInitialRestriction
  public OffsetRange getInitialRestriction(@Element StreamConfig element) {
    final int fromEpochSecond =
        (int) (Optional.ofNullable(element.startFrom).orElse(Instant.now()).getMillis() / 1000);
    final BsonTimestamp bsonTimestamp = new BsonTimestamp(fromEpochSecond, 0);
    return new OffsetRange(bsonTimestamp.getValue(), Long.MAX_VALUE);
  }

  @NewTracker
  public GrowableOffsetRangeTracker newTracker(@Restriction OffsetRange restriction) {
    return new GrowableOffsetRangeTracker(restriction.getFrom(), new MongoChangeStreamEstimator());
  }

  public static class MongoChangeStreamEstimator implements RangeEndEstimator {
    @Override
    public long estimate() {
      // estimating the range to current timestamp since we're reading them in real-time
      return new BsonTimestamp((int) (Instant.now().getMillis() / 1000L), Integer.MAX_VALUE)
          .getValue();
    }
  }

在这种情况下是否有更好的限制类型选择 - 元素的无限流 有时间戳但没有指定偏移量?

此外,当在 DirectRunner 上 运行 时,此实现似乎消耗了很多 CPU - tryClaim returns false 这似乎打开了很多新的迭代器。

有没有办法告诉 Beam 不要拆分限制或不那么积极地并行化此操作?


@ProcessElement
  public ProcessContinuation process(
      @Element StreamConfig element,
      RestrictionTracker<OffsetRange, Long> tracker,
      OutputReceiver<ChangeStreamDocument<BsonDocument>> outputReceiver) {
    final BsonTimestamp restrictionStart =
        new BsonTimestamp(tracker.currentRestriction().getFrom());

    final MongoCollection<BsonDocument> collection = getCollection(element);

    final ChangeStreamIterable<BsonDocument> iterable =
        collection.watch().startAtOperationTime(restrictionStart);
    final long restrictionEnd = tracker.currentRestriction().getTo();

    try {

      final MongoCursor<ChangeStreamDocument<BsonDocument>> iterator = iterable.iterator();
      while (iterator.hasNext()) {
        ChangeStreamDocument<BsonDocument> changeStreamDocument = iterator.next();
        final BsonTimestamp clusterTime = changeStreamDocument.getClusterTime();
        final long clusterTimeValue = clusterTime.getValue();

        if (clusterTimeValue >= restrictionEnd) {
          LOGGER.warn(
              "breaking out: " + clusterTimeValue + " outside restriction " + restrictionEnd);
          break;
        }

        if (!tracker.tryClaim(clusterTimeValue)) {
          LOGGER.warn("failed to claim " + clusterTimeValue);
          iterator.close();
          return ProcessContinuation.stop();
        }

        final int epochSecondsClusterTs = clusterTime.getTime();

        outputReceiver.outputWithTimestamp(
            changeStreamDocument, Instant.ofEpochSecond(epochSecondsClusterTs));
      }
    } catch (MongoNodeIsRecoveringException | MongoChangeStreamException | MongoSocketException e) {

      LOGGER.warn("Failed to open change stream, retrying", e);
      return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10L));
    }

    return ProcessContinuation.resume();
  }

使用时间戳作为偏移量对于限制来说是一个非常好的事情,只要你能够保证你能够读取 all 个元素到给定的时间戳。 (上面的循环假定迭代器按时间戳顺序生成元素,具体来说,一旦您看到时间戳超出范围,您就可以退出循环,而不必担心迭代器后面部分的较早元素。)

至于为什么 tryClaim 经常失败,这可能是因为直接跑步者进行了相当激进的分裂:https://github.com/apache/beam/blob/release-2.33.0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178