Hazelcast Jet - 如何在 Jet 管道中使用非静态方法

Hazelcast Jet - how to use a non-static method in a Jet pipeline

我在下面有一个基本的管道。在其中一个步骤中,我想通过调用服务中的方法来转换对象,如下所示。但是 Jet 抛出一个错误,指出此 mapFn 不可序列化。在这里做什么?它非常适合静态方法。

p.readFrom(source)
     .map(r -> dataTransformer.transformRecord(r))// dataTransformer is a service
     .writeTo(Sinks.filesBuilder(userHome).build());

使用 mapUsingService 并使用 ServiceFactory:

创建服务
p.readFrom(source)
 .mapUsingService(
    ServiceFactories.sharedService(pctx -> new DataTransformer()),
    (dataTransformer, r) -> dataTransformer.transformRecord(r))
 ...

或者,如果您的服务是可序列化且无状态的,您可以将其复制到局部变量:

DataTransformer dataTransformerLocal = dataTransformer;
p.readFrom(source)
     .map(r -> dataTransformerLocal.transformRecord(r))
     .writeTo(Sinks.filesBuilder(userHome).build());