Apache beam 2.34.0 SQSIO 非法突变异常

Apache beam 2.34.0 SQSIO illegal mutation exception

我正在尝试以批处理模式从 sqs 队列读取并使用 Apache beam 2.34.0 和 AWS beam SDK v1 写入本地文件,这会引发非法突变异常。

public class SqsReader {

    public void run(String[] args) {

        SqsReaderOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().
                as(SqsReaderOptions.class);
        Pipeline p = this.getPipeline(args);

        p.apply(SqsIO.read().withQueueUrl(options.getSourceQueueUrl())
                        .withMaxNumRecords(options.getNumberOfRecords()))
                .apply(ParDo.of(new SqsMessageToJson()))
                .apply(TextIO.write()
                        .to(options.getLocalOutputLocation())
                        .withNumShards(options.getNumShards()));

        p.run().waitUntilFinish();
    }

    public static void main(String[] args) throws IOException {
        new SqsReader().run(args);
    }

    public static class SqsMessageToJson extends DoFn<Message, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String message = Objects.requireNonNull(c.element()).getBody();
            c.output(message);
        }
    }
}

我收到以下异常

Jan 10, 2022 11:37:05 AM org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector verifyUnmodifiedThrowingCheckedExceptions
WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, maxNumRecords=1, maxReadTime=null}
Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, maxNumRecords=1, maxReadTime=null}

Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException: PTransform SqsIO.Read/Read(SqsUnboundedSource)/Read/ParMultiDo(Read) mutated value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2FXnTVQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} after it was output (new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: DeVRF8vQATm1f+rHIvR3eaejlRHksL1R7WE4zDT7lSwdIs9gJCYKXFXnTVQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}). Values must not be mutated in any way after being output.
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
    at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:231)
    at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
    at org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
    at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2KQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} mutated illegally, new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}. Encoding was rO.
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
    ... 10 more
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj=,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} mutated illegally, new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQE==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}. Encoding was rO2Mw.

其中相同的代码 在 apache beam 2.31.0 中工作没有任何问题。我在这里错过了什么?

Beam 2.34.0 中的 I/O 比 2.31.0 复杂得多。对于 Beam 2.34.0,deleteBatch logic filters messages to delete based on the inflight state. However, there are assumptions in the extend logic where the inflight state is modified to exclude messages that are assumed expired or to be expired. These messages are not explicitly requested by the I/O to be deleted from sqs nor dropped by the I/O itself (the I/O could be processing 一条消息应该已经过期以等待它被重新发送)。

归档https://issues.apache.org/jira/browse/BEAM-13627.

虽然我不确定在同一个包中使用新的收据句柄再次提取相同的消息是否会导致突变检测问题,因为收据句柄是消息哈希码的一部分,除非在突变检测器。

TL;DR: 调试过程

在 SqsUnboundedSource 中检测到突变,不是由管道中的任何其他代码引起的。

报warning并抛异常的代码为here.

唯一更改的字段是收据句柄。据记载 here

If you receive a message more than once, each time you receive it, you get a different receipt handle. You must provide the most recently received receipt handle when you request to delete the message (otherwise, the message might not be deleted).

Beam 2.31.0 和 Beam 2.34.0 之间没有 aws_java_sdk_version 变化。所以 AWS SDK 不应该是罪魁祸首。

对于 SqsUnboundedReader,Beam 2.31.0 和 Beam 2.34.0 之间存在重大变化。

要多次收到一条消息,该消息必须在第一次收到后没有被删除。删除逻辑在SqsCheckpointMark.

中调用

这个问题似乎是由 不确定编码器 (SerializableCoder.of(Message.class)) 结合使用 batch 中的 SQS reader 引起的模式。批处理模式是使用 BoundedReadFromUnboundedSource 实现的,这会导致问题。它的用法是 discouraged.

您可以关注 BEAM-13631 以了解修复 SQS 消息编码器的进展。

目前我无法告诉您 2.31 和 2.34 之间的哪些变化触发了这个问题。但它可能不是 SQS IO 本身的变化。我会继续进一步调查并希望稍后提供更新。

现在,我建议尝试一些事情:

  • 首先,尽量避免使用批处理模式(因此既不设置maxNumRecords也不设置maxReadTime)。我非常有信心这可以解决您的问题。

  • 自 Beam 的最新版本以来,AWS SDK v2 beam-sdks-java-io-amazon-web-services2 有一个单独的模块(因此我提出了上述问题)。它使用自定义消息 class 进行传输而不是 AWS SDK,并且编码应该是确定性的。但是,最近开始研究 SDK v2 IOs 时,我注意到了一些其他错误:retry on invalid receipt handles, SQS clients closed too early.

如果有任何帮助,请告诉我。