PyFlink - JSON 文件接收器?
PyFlink - JSON file sink?
是否可以像 CSV 一样在 Table API and/or DataStream API 中使用 JSON 文件接收器?
谢谢!
代码
my_sink_ddl = f"""
create table mySink (
id STRING,
dummy_item STRING
) with (
'connector.type' = 'filesystem',
'format.type' = 'json',
'connector.path' = 'output.json'
)
"""
错误
TableException: findAndCreateTableSink failed.
是的,根据JiraFLINK-17286 Integrate json to file system connector and the corresponding pull request [FLINK-17286][connectors / filesystem]Integrate json to file system connector #12010,可以从Flink1.11
开始。在 Flink 之前 1.11
我相信它不受支持。
您需要使用以下配置:
... with (
'connector' = 'filesystem',
'format' = 'json',
'path' = 'output_json' -- This must be a directory
)
加上以下环境定义:
t_env = BatchTableEnvironment.create( environment_settings=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
是否可以像 CSV 一样在 Table API and/or DataStream API 中使用 JSON 文件接收器?
谢谢!
代码
my_sink_ddl = f"""
create table mySink (
id STRING,
dummy_item STRING
) with (
'connector.type' = 'filesystem',
'format.type' = 'json',
'connector.path' = 'output.json'
)
"""
错误
TableException: findAndCreateTableSink failed.
是的,根据JiraFLINK-17286 Integrate json to file system connector and the corresponding pull request [FLINK-17286][connectors / filesystem]Integrate json to file system connector #12010,可以从Flink1.11
开始。在 Flink 之前 1.11
我相信它不受支持。
您需要使用以下配置:
... with (
'connector' = 'filesystem',
'format' = 'json',
'path' = 'output_json' -- This must be a directory
)
加上以下环境定义:
t_env = BatchTableEnvironment.create( environment_settings=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())