无法创建模板

Unable to create a template

我正在尝试使用下面的 mvn 命令创建数据流模板并且我在存储桶中有一个 json 配置文件,我需要为每个 运行 读取不同的配置文件(我不想要硬编码值`) 代码如下

在模板创建过程中,我为 getSchemaJson 方法传递的所有 3 个参数都低于错误 Value only available at runtime, but accessed from a non-runtime context.

如果可以在运行时间为 getSchemaJson 方法

发送参数,有人可以帮助我吗
-Dexec.mainClass=com.mainClass \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=project \
--stagingLocation=gs://001test_data_flow/staging \
--tempLocation=gs://001test_data_flow/temp \
--templateLocation=gs://001test_data_flow/templates/dataflow.json \
--runner=DataflowRunner \
--region=region"``

public static String getSchemaJson(ValueProvider<String> projectId, ValueProvider<String> bucketName, ValueProvider<String> filename) {
       String fileContent;
       try {
           Storage storage = StorageOptions.newBuilder().setProjectId(String.valueOf(projectId)).build().getService();
           Blob blob = storage.get(String.valueOf(bucketName), String.valueOf(filename));
           fileContent = new String(blob.getContent());
           return fileContent;
       }
       catch (Exception e) {
           LOG.error("Exception Occurred While Processing Schema!!"+e.getMessage());
           return null;
       }
   }

ValueProvider 值只能在运行时访问。您正试图在导致错误的管道构建期间访问这些值。有关 define/use Dataflow 经典模板的 ValueProvider 的正确方法,请参阅 here

或者,您可以定义一个 Dataflow flex templates,这样就不需要 ValueProvider。

您的代码使用了经典的 Dataflow 模板,它有一些限制。最大的一个(也是对您有影响的)是它们不支持动态参数。这意味着,所有参数必须在 运行.

时可用

原因是,Dataflow 使用 DAG 生成编译模板并上传到 --templateLocation。该文件包含触发作业时提供的参数。因此,作业的每个步骤(在 DAG 中)都应该是 pre-determined,因此它们不应该具有动态值。因此,filename 的值应在触发时传递。

这是由 Flex templates 修复的,编译模板是在 作业触发后生成的!代码包含在 Docker 图像中,管道 DAG 是使用单独的进程(称为启动器 VM)生成的。由于编译后的模板是在那里生成的,我们可以将动态参数传递给作业,并由这个启动器 VM 进行评估。

因此,建议将 Flex 模板用于生产级管道。

我写了 an article 关于 Python 代码库的 Flex 模板入门,但它应该 more-or-less 与 Java 相同。