如何序列化在 Apache Beam 中创建 class 的运行时

How to serialize runtime created class in Apache Beam

我有一个 apache-beam 应用程序,它在本地使用直接运行器运行管道,在 google 云中使用数据流运行器运行管道。它在本地工作但无法通过 google 数据流运行程序。

错误跟踪如下:

(9938ce94c0752c7): java.lang.RuntimeException: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.typedApply(MapTaskExecutorFactory.java:283) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.typedApply(MapTaskExecutorFactory.java:253) at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55) at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43) at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:142) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:271) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) at com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:66) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:360) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.typedApply(MapTaskExecutorFactory.java:271) ... 14 more
Caused by: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75) at com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:64) at com.google.cloud.dataflow.worker.UserParDoFnFactory.call(UserParDoFnFactory.java:100) at com.google.cloud.dataflow.worker.UserParDoFnFactory.call(UserParDoFnFactory.java:97) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.load(LocalCache.java:4904) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) ... 20 more
Caused by: java.lang.ClassNotFoundException: Header_H at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:72) ... 28 more

指向

"... unable to deserialize Serialized DoFnInfo"

"... java.lang.ClassNotFoundException: Header_H "

我怀疑这与我使用bytebuddy代码创建class Header_H有关。我使用 bytebuddy 基于现有源代码中的 some.class 和运行时来自配置文件的额外用户输入构建了一个 subclass,即 Header_H 仅在运行时可用。

我的bytebuddy代码有点像这样:

builder = new ByteBuddy().subclass(some.class).name("Header_H").modifiers(PUBLIC);
       .defineField("serialVersionUID", long.class, STATIC, PRIVATE, FINAL).value(37L)
       .implement(Serializable.class);

Class <?> clazz = builder.make().load(getClass().getClassLoader()).getLoaded();

然后clazz(在本例中为Header_H)将被传递到数据流中的管道。当我检查临时 google 云阶段位置中的 jar 文件的内容时,我看到 some.class 而不是 Header_H.class,这可能导致错误 "ClassNotFoundException"。

所以如果我的推理是正确的,那么我怎样才能让 Beam 将运行时创建 class 放在 jar 文件中以发送到数据流运行器,因为我的 implement(Serializable.class) =40=]创造?

Dataflow 运行器不控制 JAR 文件的内容 - 它只解析程序的类路径,从磁盘读取 JAR 并将它们复制到 GCS 上管道的暂存目录 .现在,Beam 不提供运送 类 不包含在您的类路径中的 JAR 中的方法。

您可能需要找到一种方法来仅使用管道规范中的那些 JAR 中的 类,但是您当然仍然可以在 DoFn 或在工作线程上运行的其他代码中使用 ByteBuddy本地。但请注意,任何将在 worker 之间传输的内容(例如 PCollection 的内容)仍然必须是可序列化的(在一个 worker 上可序列化,在另一个 worker 上可反序列化)或具有编码器。

或者,可能有一种方法可以让 ByteBuddy 生成 JAR 并将其动态添加到程序的类路径中。这可能有效,但这是一个特定于 ByteBuddy 的问题,我对 ByteBuddy 不够熟悉,不知道如何去做。

Byte Buddy 可以通过以下方式在 jar 文件中注入 class:

DynamicType.Unloaded<?> type = builder.make();
builder.inject(someJar);

这将更改现有的 jar 文件以包含动态生成的 class。这样,您可以更改系统 class 路径上已有的现有 jar。

此 API 还允许您创建一个新的 jar,并且您可以使用 Instrumentation API(通过 Java 代理)允许您追加这个 class 作为一个新的 jar 文件到 class 路径。为避免附加代理,您还可以尝试使用 byte-buddy-agent 项目进行动态附件。

这适用于:

File someFolder = ...
File jar = builder.saveIn(someFolder);
ByteBuddyAgent.install().appendToSystemClassLoaderSearch(new JarFile(jar));

如果 Google Cloud 不允许动态附件,您可以通过命令行上的常规附件来解决此问题。