Apache beam Text IO 编写器未将无限源写入文件

Apache beam Text IO writer is not writing unbounded source to file

以下代码在 beam direct runner 中运行没有任何问题。 sqs 消息已被消费,但消息未写入目标位置。


Options options = PipelineOptionsFactory.fromArgs(CONFIG_STREAMING_SQS_GCS).withValidation().as(Options.class);

BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("you-key", "your-secret");
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(basicAWSCredentials));
// AwsUtils.setupOptions(options); <- fetches the secret from GCP, but replaced with inline Auth
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read messages from Sqs", SqsIO.read().withQueueUrl(options.getInputQueueUrl()))
        .apply("Get message contents", ParDo.of(new SqsMessageToJson()))
        .apply("Print incoming", ParDo.of(new RowPrinter<>("Print incoming")))
        .apply("Create Window",Window.into(FixedWindows.of(Duration.standardSeconds(10))))
        .apply("Write to GCS", TextIO.write()
            .withWindowedWrites()
            .withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getDestinationBucketUrl()))
            .to(new WindowedFilenamePolicy(options.getOutputFilenamePrefix(),
                options.getShardTemplate(),
                options.getOutputFilenameSuffix())
                .withSubDirectoryPolicy(options.getSubDirectoryPolicy()))
            .withNumShards(options.getNumShards()));
PipelineResult run = pipeline.run();
run.waitUntilFinish();

看起来这是 Apache Beam 本身的问题。我们已向 google 提交支持票以调查此问题。