Apache beam Text IO 编写器未将无限源写入文件
Apache beam Text IO writer is not writing unbounded source to file
以下代码在 beam direct runner 中运行没有任何问题。 sqs 消息已被消费,但消息未写入目标位置。
Options options = PipelineOptionsFactory.fromArgs(CONFIG_STREAMING_SQS_GCS).withValidation().as(Options.class);
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("you-key", "your-secret");
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(basicAWSCredentials));
// AwsUtils.setupOptions(options); <- fetches the secret from GCP, but replaced with inline Auth
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read messages from Sqs", SqsIO.read().withQueueUrl(options.getInputQueueUrl()))
.apply("Get message contents", ParDo.of(new SqsMessageToJson()))
.apply("Print incoming", ParDo.of(new RowPrinter<>("Print incoming")))
.apply("Create Window",Window.into(FixedWindows.of(Duration.standardSeconds(10))))
.apply("Write to GCS", TextIO.write()
.withWindowedWrites()
.withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getDestinationBucketUrl()))
.to(new WindowedFilenamePolicy(options.getOutputFilenamePrefix(),
options.getShardTemplate(),
options.getOutputFilenameSuffix())
.withSubDirectoryPolicy(options.getSubDirectoryPolicy()))
.withNumShards(options.getNumShards()));
PipelineResult run = pipeline.run();
run.waitUntilFinish();
看起来这是 Apache Beam 本身的问题。我们已向 google 提交支持票以调查此问题。
以下代码在 beam direct runner 中运行没有任何问题。 sqs 消息已被消费,但消息未写入目标位置。
Options options = PipelineOptionsFactory.fromArgs(CONFIG_STREAMING_SQS_GCS).withValidation().as(Options.class);
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("you-key", "your-secret");
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(basicAWSCredentials));
// AwsUtils.setupOptions(options); <- fetches the secret from GCP, but replaced with inline Auth
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read messages from Sqs", SqsIO.read().withQueueUrl(options.getInputQueueUrl()))
.apply("Get message contents", ParDo.of(new SqsMessageToJson()))
.apply("Print incoming", ParDo.of(new RowPrinter<>("Print incoming")))
.apply("Create Window",Window.into(FixedWindows.of(Duration.standardSeconds(10))))
.apply("Write to GCS", TextIO.write()
.withWindowedWrites()
.withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getDestinationBucketUrl()))
.to(new WindowedFilenamePolicy(options.getOutputFilenamePrefix(),
options.getShardTemplate(),
options.getOutputFilenameSuffix())
.withSubDirectoryPolicy(options.getSubDirectoryPolicy()))
.withNumShards(options.getNumShards()));
PipelineResult run = pipeline.run();
run.waitUntilFinish();
看起来这是 Apache Beam 本身的问题。我们已向 google 提交支持票以调查此问题。