什么是用于可拆分 DoFn 读取无界 Iterable 的正确 RestrictionT?
What is a correct RestrictionT to use for Splittable DoFn reading an unbounded Iterable?
我正在编写一个 Splittable DoFn 来读取 MongoDB 更改流。
它允许我观察描述集合更改的事件,并且我可以在我想要的任意集群时间戳处开始读取,前提是 oplog 有足够的历史记录。
集群时间戳是自纪元以来的秒数与给定秒内操作的序列号相结合。
我已经查看了 SDF 的其他示例,但到目前为止我所看到的都是
假设一个“可搜索的”数据源(Kafka 主题分区、Parquet/Avro 文件等)
MongoDB暴露的接口是一个简单的Iterable,所以我真的无法seek
精确偏移
(除了在时间戳之后开始获得一个新的 Iterable 之外),并且它产生的事件只有集群时间戳——同样,没有与输出元素关联的精确偏移量。
为了配置 SDF,我使用以下 class 作为我的输入元素类型:
public static class StreamConfig implements Serializable {
public final String databaseName;
public final String collectionName;
public final Instant startFrom;
...
}
作为限制,我使用 OffsetRange
因为我可以将这些时间戳转换为 Long 值并返回。
对于偏移跟踪器,我选择了 GrowableOffsetRangeTracker,因为这一次可以处理潜在的无限范围。
我在提出范围结束估计器时遇到了问题 - 最后我假设 now()
会是
自从我们最快读取流以来的最大潜在时间戳是实时的。
@GetInitialRestriction
public OffsetRange getInitialRestriction(@Element StreamConfig element) {
final int fromEpochSecond =
(int) (Optional.ofNullable(element.startFrom).orElse(Instant.now()).getMillis() / 1000);
final BsonTimestamp bsonTimestamp = new BsonTimestamp(fromEpochSecond, 0);
return new OffsetRange(bsonTimestamp.getValue(), Long.MAX_VALUE);
}
@NewTracker
public GrowableOffsetRangeTracker newTracker(@Restriction OffsetRange restriction) {
return new GrowableOffsetRangeTracker(restriction.getFrom(), new MongoChangeStreamEstimator());
}
public static class MongoChangeStreamEstimator implements RangeEndEstimator {
@Override
public long estimate() {
// estimating the range to current timestamp since we're reading them in real-time
return new BsonTimestamp((int) (Instant.now().getMillis() / 1000L), Integer.MAX_VALUE)
.getValue();
}
}
在这种情况下是否有更好的限制类型选择 - 元素的无限流
有时间戳但没有指定偏移量?
此外,当在 DirectRunner 上 运行 时,此实现似乎消耗了很多 CPU - tryClaim
returns false
这似乎打开了很多新的迭代器。
有没有办法告诉 Beam 不要拆分限制或不那么积极地并行化此操作?
@ProcessElement
public ProcessContinuation process(
@Element StreamConfig element,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<ChangeStreamDocument<BsonDocument>> outputReceiver) {
final BsonTimestamp restrictionStart =
new BsonTimestamp(tracker.currentRestriction().getFrom());
final MongoCollection<BsonDocument> collection = getCollection(element);
final ChangeStreamIterable<BsonDocument> iterable =
collection.watch().startAtOperationTime(restrictionStart);
final long restrictionEnd = tracker.currentRestriction().getTo();
try {
final MongoCursor<ChangeStreamDocument<BsonDocument>> iterator = iterable.iterator();
while (iterator.hasNext()) {
ChangeStreamDocument<BsonDocument> changeStreamDocument = iterator.next();
final BsonTimestamp clusterTime = changeStreamDocument.getClusterTime();
final long clusterTimeValue = clusterTime.getValue();
if (clusterTimeValue >= restrictionEnd) {
LOGGER.warn(
"breaking out: " + clusterTimeValue + " outside restriction " + restrictionEnd);
break;
}
if (!tracker.tryClaim(clusterTimeValue)) {
LOGGER.warn("failed to claim " + clusterTimeValue);
iterator.close();
return ProcessContinuation.stop();
}
final int epochSecondsClusterTs = clusterTime.getTime();
outputReceiver.outputWithTimestamp(
changeStreamDocument, Instant.ofEpochSecond(epochSecondsClusterTs));
}
} catch (MongoNodeIsRecoveringException | MongoChangeStreamException | MongoSocketException e) {
LOGGER.warn("Failed to open change stream, retrying", e);
return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10L));
}
return ProcessContinuation.resume();
}
使用时间戳作为偏移量对于限制来说是一个非常好的事情,只要你能够保证你能够读取 all 个元素到给定的时间戳。 (上面的循环假定迭代器按时间戳顺序生成元素,具体来说,一旦您看到时间戳超出范围,您就可以退出循环,而不必担心迭代器后面部分的较早元素。)
至于为什么 tryClaim 经常失败,这可能是因为直接跑步者进行了相当激进的分裂:https://github.com/apache/beam/blob/release-2.33.0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178
我正在编写一个 Splittable DoFn 来读取 MongoDB 更改流。 它允许我观察描述集合更改的事件,并且我可以在我想要的任意集群时间戳处开始读取,前提是 oplog 有足够的历史记录。 集群时间戳是自纪元以来的秒数与给定秒内操作的序列号相结合。
我已经查看了 SDF 的其他示例,但到目前为止我所看到的都是 假设一个“可搜索的”数据源(Kafka 主题分区、Parquet/Avro 文件等)
MongoDB暴露的接口是一个简单的Iterable,所以我真的无法seek
精确偏移
(除了在时间戳之后开始获得一个新的 Iterable 之外),并且它产生的事件只有集群时间戳——同样,没有与输出元素关联的精确偏移量。
为了配置 SDF,我使用以下 class 作为我的输入元素类型:
public static class StreamConfig implements Serializable {
public final String databaseName;
public final String collectionName;
public final Instant startFrom;
...
}
作为限制,我使用 OffsetRange
因为我可以将这些时间戳转换为 Long 值并返回。
对于偏移跟踪器,我选择了 GrowableOffsetRangeTracker,因为这一次可以处理潜在的无限范围。
我在提出范围结束估计器时遇到了问题 - 最后我假设 now()
会是
自从我们最快读取流以来的最大潜在时间戳是实时的。
@GetInitialRestriction
public OffsetRange getInitialRestriction(@Element StreamConfig element) {
final int fromEpochSecond =
(int) (Optional.ofNullable(element.startFrom).orElse(Instant.now()).getMillis() / 1000);
final BsonTimestamp bsonTimestamp = new BsonTimestamp(fromEpochSecond, 0);
return new OffsetRange(bsonTimestamp.getValue(), Long.MAX_VALUE);
}
@NewTracker
public GrowableOffsetRangeTracker newTracker(@Restriction OffsetRange restriction) {
return new GrowableOffsetRangeTracker(restriction.getFrom(), new MongoChangeStreamEstimator());
}
public static class MongoChangeStreamEstimator implements RangeEndEstimator {
@Override
public long estimate() {
// estimating the range to current timestamp since we're reading them in real-time
return new BsonTimestamp((int) (Instant.now().getMillis() / 1000L), Integer.MAX_VALUE)
.getValue();
}
}
在这种情况下是否有更好的限制类型选择 - 元素的无限流 有时间戳但没有指定偏移量?
此外,当在 DirectRunner 上 运行 时,此实现似乎消耗了很多 CPU - tryClaim
returns false
这似乎打开了很多新的迭代器。
有没有办法告诉 Beam 不要拆分限制或不那么积极地并行化此操作?
@ProcessElement
public ProcessContinuation process(
@Element StreamConfig element,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<ChangeStreamDocument<BsonDocument>> outputReceiver) {
final BsonTimestamp restrictionStart =
new BsonTimestamp(tracker.currentRestriction().getFrom());
final MongoCollection<BsonDocument> collection = getCollection(element);
final ChangeStreamIterable<BsonDocument> iterable =
collection.watch().startAtOperationTime(restrictionStart);
final long restrictionEnd = tracker.currentRestriction().getTo();
try {
final MongoCursor<ChangeStreamDocument<BsonDocument>> iterator = iterable.iterator();
while (iterator.hasNext()) {
ChangeStreamDocument<BsonDocument> changeStreamDocument = iterator.next();
final BsonTimestamp clusterTime = changeStreamDocument.getClusterTime();
final long clusterTimeValue = clusterTime.getValue();
if (clusterTimeValue >= restrictionEnd) {
LOGGER.warn(
"breaking out: " + clusterTimeValue + " outside restriction " + restrictionEnd);
break;
}
if (!tracker.tryClaim(clusterTimeValue)) {
LOGGER.warn("failed to claim " + clusterTimeValue);
iterator.close();
return ProcessContinuation.stop();
}
final int epochSecondsClusterTs = clusterTime.getTime();
outputReceiver.outputWithTimestamp(
changeStreamDocument, Instant.ofEpochSecond(epochSecondsClusterTs));
}
} catch (MongoNodeIsRecoveringException | MongoChangeStreamException | MongoSocketException e) {
LOGGER.warn("Failed to open change stream, retrying", e);
return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10L));
}
return ProcessContinuation.resume();
}
使用时间戳作为偏移量对于限制来说是一个非常好的事情,只要你能够保证你能够读取 all 个元素到给定的时间戳。 (上面的循环假定迭代器按时间戳顺序生成元素,具体来说,一旦您看到时间戳超出范围,您就可以退出循环,而不必担心迭代器后面部分的较早元素。)
至于为什么 tryClaim 经常失败,这可能是因为直接跑步者进行了相当激进的分裂:https://github.com/apache/beam/blob/release-2.33.0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178