FlatMapElement Kotlin Beam 非序列化 lambda

FlatMapElement Kotlin Beam non Serializable lambda

我有一个现有的 Apache Beam 项目 Java 8、Apache Beam 2.27.0、Maven 和 Dagger 2。

我在 Kotlin 中迁移了这个项目:Kotlin JDK 8 with version 1.5.0.

我使用了 1.5.0 版的 Kotlin,因为 1.4.3 版的 Beam 和 Maven 插件存在问题(无法读取 class:VirtualFile:

除了使用带有 Typedescriptor 和 lambda 表达式的本机 MapElement 或 FlatMapElement 之外,一切似乎都很好。

我的 pom.xml 文件的一部分

<properties>
        <beam.version>2.27.0</beam.version>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kotlin.code.style>official</kotlin.code.style>
        <kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
        <kotlin.compiler.incremental>true</kotlin.compiler.incremental>

        <kotlin.version>1.5.0</kotlin.version>
        <serialization.version>1.2.0</serialization.version>
        <java.version>1.8</java.version>
        
        <dagger.version>2.35.1</dagger.version>
        <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
        <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
        <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<properties>


<dependencies>
       <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib-jdk8</artifactId>
            <version>${kotlin.version}</version>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlinx</groupId>
            <artifactId>kotlinx-serialization-json</artifactId>
            <version>${serialization.version}</version>
        </dependency>
        
        <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
                <version>${beam.version}</version>
                <scope>runtime</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-sdks-java-core</artifactId>
                <version>${beam.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
                <version>${beam.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-sdks-java-io-redis</artifactId>
                <version>${beam.version}</version>
            </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-test-junit</artifactId>
            <version>${kotlin.version}</version>
            <scope>test</scope>
        </dependency>
<dependencies>

<build>
        <plugins>

           <plugin>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-maven-plugin</artifactId>
                <version>${kotlin.version}</version>
                <executions>
                    <execution>
                        <id>kapt</id>
                        <goals>
                            <goal>kapt</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>src/main/kotlin</sourceDir>
                            </sourceDirs>
                            <annotationProcessorPaths>
                                <annotationProcessorPath>
                                    <groupId>com.google.dagger</groupId>
                                    <artifactId>dagger-compiler</artifactId>
                                    <version>${dagger.version}</version>
                                </annotationProcessorPath>
                            </annotationProcessorPaths>
                        </configuration>
                    </execution>
                    <execution>
                        <id>compile</id>
                        <phase>process-sources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>src/main/kotlin</sourceDir>
                            </sourceDirs>
                        </configuration>
                    </execution>

                    <execution>
                        <id>test-kapt</id>
                        <goals>
                            <goal>test-kapt</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>src/test/kotlin</sourceDir>
                            </sourceDirs>
                            <annotationProcessorPaths>
                                <annotationProcessorPath>
                                    <groupId>com.google.dagger</groupId>
                                    <artifactId>dagger-compiler</artifactId>
                                    <version>${dagger.version}</version>
                                </annotationProcessorPath>
                            </annotationProcessorPaths>
                        </configuration>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <goals>
                            <goal>test-compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>src/test/kotlin</sourceDir>
                                <sourceDir>target/generated-sources/kapt/test</sourceDir>
                            </sourceDirs>
                        </configuration>
                    </execution>
                </executions>
                <configuration>
                    <compilerPlugins>
                        <plugin>kotlinx-serialization</plugin>
                    </compilerPlugins>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.jetbrains.kotlin</groupId>
                        <artifactId>kotlin-maven-serialization</artifactId>
                        <version>${kotlin.version}</version>
                    </dependency>
                </dependencies>
            </plugin>
            
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${maven-surefire-plugin.version}</version>
                <dependencies>
                    <dependency>
                        <groupId>org.apache.maven.surefire</groupId>
                        <artifactId>surefire-junit47</artifactId>
                        <version>${maven-surefire-plugin.version}</version>
                    </dependency>
                </dependencies>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>${maven-exec-plugin.version}</version>
                <configuration>
                    <cleanupDaemonThreads>false</cleanupDaemonThreads>
                </configuration>
            </plugin>
 </plugins>

实现可序列化的对象(java.io)

data class MyObject(
    val field: String = ""
) : Serializable {

基本上我想执行一个带有 Typedescriptor 和 lambda 的 FlatMapElement(在场景后面是一个 SerializableFunction)

class MyTransform(private val redisConnectionConf: RedisConnectionConfiguration) :
    PTransform<PBegin, PCollection<MyObject>>() {

    override fun expand(input: PBegin): PCollection<MyObject> {
        return input
            .apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*"))
            .apply(
                FlatMapElements.into(of(MyObject::class.java))
                    .via(SerializableFunction<KV<String, String>, List<MyObject>> { toMyObjects(it) })
            )
    }

fun toMyObjects(entry: KV<String, String>): List<MyObject> {
        val key = entry.key
        val value = entry.value
        
        val ref = object : TypeReference<List<MyObject>>() {}
        return OBJECT_MAPPER.readValue(value, ref)
    }

我自愿更改了代码并将部分代码放在方法“toMyObjects”中以提供最大元素。 “OBJECT_MAPPER”对象是 Jackson 对象映射器。

在 Java 8 和 Beam 2.27.0 中,这个基本代码工作得很好。

对于 Kotlin,此代码无法运行并出现以下错误:

at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:59)
    at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn (ParDoTranslation.java:692)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translateDoFn (PrimitiveParDoSingleFactory.java:218)
    at org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike (ParDoTranslation.java:814)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle (PrimitiveParDoSingleFactory.java:214)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate (PrimitiveParDoSingleFactory.java:163)
    at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:429)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:239)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
    at org.apache.beam.runners.core.construction.PipelineTranslation.visitPrimitiveTransform (PipelineTranslation.java:87)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:587)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:59)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:933)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:196)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at myPackage.MyApp.main (MyApp.kt:44)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.io.NotSerializableException: Non-serializable lambda
    at mypackage.MyTransform$$Lambda3/1784079343.writeObject (Unknown Source)


[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project my-project: 
An exception occured while executing the Java class. unable to serialize 
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements@23402e70,
mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}: Non-serializable lambda -> [Help 1]

Beam sdk 中的 SerializableUtils.serializeToByteArray 方法发送此错误:java.io.NotSerializableException: Non-serializable lambda

MyObject 是可序列化的,lambda 被包装在 Beam SerializableFunction(实现可序列化的函数)中。

通常在这种情况下,Beam 从 Serializable 对象中获取一个 SerializableCoder。 我不明白为什么 Beam 将 lambda 视为不可序列化。

我直接在Java没有这种行为。

我很准确,如果我用 ParDo.of(DoFn) 替换 FlatMapElement/descriptor/lambda,这工作正常,但在某些情况下为了更好的简洁性和可读性,我想使用内置带有 lambda 表达式的 MapElement 和 FlatMapElement。

在此先感谢您的帮助。

当我用实现 SerializableFunction 函数的 class 替换 lambda 时,这有效

class MapString : SerializableFunction<KV<String, String>, List<MyObject>> {
        override fun apply(input: KV<String, String>): List<MyObject> {
            ....
        }
    }

我一直打开这个问题,因为我想要一个使用 lambda 表达式的解决方案。

终于找到解决方案,我将 Kotlin 版本(依赖项 + 插件)降级到 1.4.21。

在这种情况下,Lambda non Serializable 的问题消失了,kotlin Maven 插件在编译时没有虚拟文件问题:

这个话题对我帮助很大,谢谢:https://youtrack.jetbrains.com/issue/KT-45067

如果 Kotlin maven 插件可以在版本 1.4.x 大于 1.4.21.

下正常工作,也许将来会很棒

使用 Kotlin 和 Maven 的 Beam 开发人员必须小心这个问题,1.4.32 不能使用 Beam 编译,而 1.5.0 在 Lambda 非序列化运行时有问题。

如果您想使用 Kotlin 1.5,请尝试以下解决方法: use -Xsam-conversions=class

<plugins>
  <plugin>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-maven-plugin</artifactId>
    <version>${kotlin.version}</version>
    <configuration>
      <args>
        <arg>-Xsam-conversions=class</arg>
      </args>
    </configuration>
  </plugin>
</plugins>

参考:https://youtrack.jetbrains.com/issue/KT-46359#focus=Comments-27-4862857.0-0