如何在单个 PyFlink SQL 作业中有多个 INSERT 语句?
How to have multiple INSERT statements within single PyFlink SQL job?
是否可以在一个 PyFlink 作业中包含多个 INSERT INTO ... SELECT ...
语句(在 Flink 1.13.6 上)?
我创建了一些输出 table,我正在尝试在单个作业中写入这些输出,示例 Python & SQL 看起来像(假设有一个名为 'input' 的输入 table):
sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input"
sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input"
env.execute_sql(sql1)
env.execute_sql(sql2)
当这是 运行 在 AWS 上的 Kinesis 内的 Flink 集群内时,我遇到了一个失败:
Cannot have more than one execute() or executeAsync() call in a single
environment.
当我查看 Flink web UI 时,我可以看到有一个名为 insert-into_default_catalog.default_database.out1
的作业。 Flink 是否将每个 INSERT 语句分离成一个单独的作业?看起来它试图为第一个查询创建一个作业,然后未能为第二个查询创建第二个作业。
有没有什么方法可以使用 SQL 将它作为一个单独的作业添加到 运行,而不必离开 SQL 和 Table API?
如果你想做多个INSERT,你需要将它们包装在一个语句集中:
stmt_set = table_env.create_statement_set()
# only single INSERT query can be accepted by `add_insert_sql` method
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)
# execute all statements together
table_result = stmt_set.execute()
# get job status through TableResult
print(table_result.get_job_client().get_job_status())
有关详细信息,请参阅 the docs。
是否可以在一个 PyFlink 作业中包含多个 INSERT INTO ... SELECT ...
语句(在 Flink 1.13.6 上)?
我创建了一些输出 table,我正在尝试在单个作业中写入这些输出,示例 Python & SQL 看起来像(假设有一个名为 'input' 的输入 table):
sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input"
sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input"
env.execute_sql(sql1)
env.execute_sql(sql2)
当这是 运行 在 AWS 上的 Kinesis 内的 Flink 集群内时,我遇到了一个失败:
Cannot have more than one execute() or executeAsync() call in a single environment.
当我查看 Flink web UI 时,我可以看到有一个名为 insert-into_default_catalog.default_database.out1
的作业。 Flink 是否将每个 INSERT 语句分离成一个单独的作业?看起来它试图为第一个查询创建一个作业,然后未能为第二个查询创建第二个作业。
有没有什么方法可以使用 SQL 将它作为一个单独的作业添加到 运行,而不必离开 SQL 和 Table API?
如果你想做多个INSERT,你需要将它们包装在一个语句集中:
stmt_set = table_env.create_statement_set()
# only single INSERT query can be accepted by `add_insert_sql` method
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)
# execute all statements together
table_result = stmt_set.execute()
# get job status through TableResult
print(table_result.get_job_client().get_job_status())
有关详细信息,请参阅 the docs。