SpannerIO java.lang.IllegalStateException:这里的排序器应该为空

SpannerIO java.lang.IllegalStateException: Sorter should be null here

我正在尝试使用

从 DataFlow 流作业写入 Spanner
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>2.18.0</version>
</dependency>

将数据映射到 PCollection<Mutation> 后,我通过 SpannerIO.write

将它们写入 Spanner
Pipeline pipeline = Pipeline.create(options);

PCollection<Mutation> mutations = pipeline.apply...

mutations.apply("WriteMutations", SpannerIO.write()
                .withInstanceId(INSTANCE_ID)
                .withDatabaseId(DATABASE_ID)
);

pipeline.run();

但是,它抛出

java.lang.IllegalStateException: Sorter should be null here
        at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$GatherBundleAndSortFn.startBundle (SpannerIO.java:1080)

这个异常的原因是什么?

以下管道产生异常。我用 20 名工人对其进行了测试,但它看起来与数据负载无关。

import com.google.cloud.spanner.Mutation;

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import java.util.UUID;

public final class TestPipeline {

    private static final Duration WINDOW_DURATION = Duration.standardSeconds(1);
    private static final String DATABASE_ID = "test";
    private static final String INSTANCE_ID = "test-spanner";
    private static final String TEST_TABLE = "test";

    public static void main(String[] args) {
        TestPipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(TestPipelineOptions.class);

        Pipeline pipeline = Pipeline.create(options);

        pipeline
                .apply("Read pubsub", PubsubIO.readMessagesWithAttributes()
                        .fromSubscription(options.getInputSubscription()))
                .apply("Parse message", ParDo.of(new ProcessMessage()))
                .apply("Windowing", Window.<Mutation>into(new GlobalWindows())
                        .triggering(Repeatedly.forever(
                                AfterProcessingTime.pastFirstElementInPane()
                                        .plusDelayOf(WINDOW_DURATION)))
                        .withAllowedLateness(Duration.ZERO)
                        .discardingFiredPanes())
                .apply("Write mutations", SpannerIO.write()
                        .withInstanceId(INSTANCE_ID)
                        .withDatabaseId(DATABASE_ID)
                );

        pipeline.run();
    }

    private static class ProcessMessage extends DoFn<PubsubMessage, Mutation> {

        @ProcessElement
        public void processElement(@Element final PubsubMessage message,
                                   final OutputReceiver<Mutation> out) {
            out.output(Mutation.newInsertOrUpdateBuilder(TEST_TABLE)
                    .set("id").to(UUID.randomUUID().toString())
                    .set("string").to("test")
                    .set("count").to(Long.MAX_VALUE)
                    .build()
            );
        }
    }

    interface TestPipelineOptions extends DataflowPipelineOptions {

        void setInputSubscription(String inputSubscription);

        @Description("Google Pubsub subscription id.")
        String getInputSubscription();
    }

}

Table CREATE TABLE test (id STRING(50) NOT NULL, string STRING(50) NOT NULL, count INT64) PRIMARY KEY (id);

此问题似乎出现在 apache beam 2.18 版中,但不会出现在 2.17 版中。

这里跟踪了 apache beam 2.18 版的问题:https://issues.apache.org/jira/browse/BEAM-9505