如何在 python apache beam 中对 window 中的元素进行排序?
How can I order elements in a window in python apache beam?
我注意到 java apache beam 具有 class groupby.sortbytimestamp python 是否实现了该功能?如果不是,那么在 window 中对元素进行排序的方式是什么?我想我可以在 DoFn 中对整个 window 进行排序,但我想知道是否有更好的方法。
Beam 中目前没有内置值排序(在 Python 或 Java 中)。现在,最好的选择是像你提到的那样在 DoFn 中自己对值进行排序。
这是一个使用 CombineFn 的解决方案。它具有使用 TreeSet 删除重复数据的额外好处。您还应该确保 window 的数据足够小以适合单个 worker 的内存。
public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
@Override
public TreeSet<MarketData> createAccumulator() {
return new TreeSet<>(Comparator
.comparingLong(MarketData::getEventTime)
.thenComparing(MarketData::getOrderbookType));
}
@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
accum.add(input);
return accum;
}
@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {
TreeSet<MarketData> merged = createAccumulator();
for (TreeSet<MarketData> accum : accums) {
merged.addAll(accum);
}
return merged;
}
@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
return Lists.newArrayList(accum.iterator());
}
}
我注意到 java apache beam 具有 class groupby.sortbytimestamp python 是否实现了该功能?如果不是,那么在 window 中对元素进行排序的方式是什么?我想我可以在 DoFn 中对整个 window 进行排序,但我想知道是否有更好的方法。
Beam 中目前没有内置值排序(在 Python 或 Java 中)。现在,最好的选择是像你提到的那样在 DoFn 中自己对值进行排序。
这是一个使用 CombineFn 的解决方案。它具有使用 TreeSet 删除重复数据的额外好处。您还应该确保 window 的数据足够小以适合单个 worker 的内存。
public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
@Override
public TreeSet<MarketData> createAccumulator() {
return new TreeSet<>(Comparator
.comparingLong(MarketData::getEventTime)
.thenComparing(MarketData::getOrderbookType));
}
@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
accum.add(input);
return accum;
}
@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {
TreeSet<MarketData> merged = createAccumulator();
for (TreeSet<MarketData> accum : accums) {
merged.addAll(accum);
}
return merged;
}
@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
return Lists.newArrayList(accum.iterator());
}
}