使用会话窗口通过 TextIO.write 写入会引发 GroupByKey 消耗异常

Writing via TextIO.write with Sessions windowing raises GroupByKey consumption exception

在 Apache Beam 2.0.0 中使用会话 windowing 并通过 TextIO.write 写入文件时,调用 TextIO.write() 会生成以下异常:

java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey

即使没有干预 GroupByKey 可能消耗 window,也会发生异常。我包含了代码——主要功能说明了问题,并包含了一个用于 2.0.0.

的辅助策略编写器 class
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;


public class TestSessionWindowToFile {
    /**
     * Support class: a filename policy for getting one file per window.
     * See https://github.com/apache/beam/blob/release-2.0.0/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
     */
    public static class LocalPerWindowFiles extends FileBasedSink.FilenamePolicy {
        private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
        private final String prefix;

        public LocalPerWindowFiles(String prefix) {
            this.prefix = prefix;
        }

        public String filenamePrefixForWindow(IntervalWindow window) {
            return String.format("%s-%s-%s",
                    prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
        }

        @Override
        public ResourceId windowedFilename(
                ResourceId outputDirectory, WindowedContext context, String extension) {
            IntervalWindow window = (IntervalWindow) context.getWindow();
            String filename = String.format(
                    "%s-%s-of-%s%s",
                    filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
                    extension);
            return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }

        @Override
        public ResourceId unwindowedFilename(
                ResourceId outputDirectory, Context context, String extension) {
            throw new UnsupportedOperationException("Unsupported.");
        }
    }


    /**
     * Creating a session windows and then asking TextIO to write the results leads to
     * "java.lang.IllegalStateException: GroupByKey must have a valid Window merge function.
     * Invalid because: WindowFn has already been consumed by previous GroupByKey"
     */
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        PCollection<String> input = p.apply(
                Create.timestamped(
                        TimestampedValue.of("this", new Instant(1)),
                        TimestampedValue.of("is", new Instant(2)),
                        TimestampedValue.of("a", new Instant(3)),
                        TimestampedValue.of("test", new Instant(4)),
                        TimestampedValue.of("test", new Instant(5)),
                        TimestampedValue.of("test", new Instant(50)),
                        TimestampedValue.of("test", new Instant(51)),
                        TimestampedValue.of("test", new Instant(52))
                )
        );

        PCollection<String> windowedInputs = input
                // session windowing fails:
                .apply(Window.into(Sessions.withGapDuration(new org.joda.time.Duration(10))));
                // sliding windowing succeeds:
                //.apply(Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10))));

        // Invoke counting of elements so that sessioning is more clear
        PCollection<KV<String, Long>> counts =
                windowedInputs.apply(Count.perElement());
        PCollection<String> writeableStrings = counts.apply("Convert to text",
            ParDo.of(new DoFn<KV<String, Long>, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String word = c.element().getKey();
                Long count = c.element().getValue();
                c.output(String.format("%s,%d", word, count));
            }
        }));

        writeableStrings
                .apply(TextIO.write()
                        .to("i_am_ignored_given_filename_policy")
                        .withFilenamePolicy(new LocalPerWindowFiles("results/testSessionWindow"))
                        .withWindowedWrites()
                        .withNumShards(1)
        );
        p.run();
    }
}

我没有看到围绕 watermarks/triggering、时间戳组合、Window.remerge()ing 澄清意图的效果, 或使用 Beam 2.1.0(Beam 2.1.0 包含一个默认文件名策略,该策略知道如何编写 windowed 文件以及 unwindowed 文件)。

日志记录表明会话构建正确,SlidingWindow 成功生成输出文件(使用 .apply( Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10)))); 等变体代替 Sessions)。这表明会话 windowing + TextIO.write 的配置错误或行为不当。

如何修改此代码以便为每个键+开始+结束window分组编写一个文本文件?

这是 WriteFiles 转换中的错误。我已经提交 https://issues.apache.org/jira/browse/BEAM-3122 。不幸的是,我想不出解决方法,除了修复错误。