Apache 光束初始化程序

Apache beam Initializer

在我的数据流作业中,我需要在实际处理开始之前初始化配置工厂并在审计日志中记录某些消息。

我已经将 Config 工厂初始化代码 + 审核日志记录放在父 class PlatformInitializer 中,并将其扩展到我的主管道 class。

public class CustomJob extends PlatformInitializer implements Serializable {
    public static void main(String[] args) throws PropertyVetoException {
        CustomJob myCustomjob = new CustomJob();

        // Initialize config factories
        myCustomjob.initialize();

        // trigger dataflow job
        myCustomjob.parallelRead(args);
    }

因此,我还必须在我的管道 class 中实现 Serializable 接口,因为 beam 抛出错误 - java.io.NotSerializableException: org.devoteam.CustomJob

在 PlatformInitializer 内部,我有一个 initilize() 方法,它包含配置工厂的初始化逻辑并记录一些初始审计消息。

public class PlatformInitializer {

    public void initialize() {
        // Configfactory factory = new Configfactory()
        // CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )
    }
}

我的问题是 - 这种调用某些需要在管道开始执行之前调用的代码的方法是否正确?

如果您需要在运行时(而不是管道构建时)初始化的对象,您应该将初始化逻辑移至 Beam DoFn. DoFn has a number of method annotations that could be used to denote methods that should be executed in different lifecycle phases. Setup and StartBundle annotations might be useful for your use-case. See 以获取更多详细信息。