Flink:无法将流导入 csv
Flink: Not able to sink a stream into csv
我正在尝试使用 PyFlink 将流以 csv 格式导入文件系统,但是它不起作用。
# stream_to_csv.py
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)
""")
table_env.execute_sql("""
CREATE TABLE print (
id INT,
data STRING
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output'
)
""")
table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
到运行脚本:
$ python stream_to_csv.py
我希望记录进入 /tmp/output 文件夹,但事实并非如此。
$ ~ ls /tmp/output
(nothing shown here)
有什么想念的吗?
我厚着脸皮复制了典富在http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Not-able-to-sink-a-stream-into-csv-td43105.html的回复。
您需要为文件系统设置滚动策略。您可以参考滚动策略部分 [1] 了解更多详细信息。
其实有输出,你可以执行命令ls -la /tmp/output/
,然后你会看到几个名为“.part-xxx”的文件。
对于您的工作,您需要在配置中设置 execution.checkpointing.interval
并在文件系统连接器的 属性 中设置 sink.rolling-policy.rollover-interval
。
作业将如下所示:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)
""")
table_env.execute_sql("""
CREATE TABLE print (
id INT,
data STRING
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output',
'sink.rolling-policy.rollover-interval' = '10s'
)
""")
table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
我正在尝试使用 PyFlink 将流以 csv 格式导入文件系统,但是它不起作用。
# stream_to_csv.py
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)
""")
table_env.execute_sql("""
CREATE TABLE print (
id INT,
data STRING
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output'
)
""")
table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
到运行脚本:
$ python stream_to_csv.py
我希望记录进入 /tmp/output 文件夹,但事实并非如此。
$ ~ ls /tmp/output
(nothing shown here)
有什么想念的吗?
我厚着脸皮复制了典富在http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Not-able-to-sink-a-stream-into-csv-td43105.html的回复。
您需要为文件系统设置滚动策略。您可以参考滚动策略部分 [1] 了解更多详细信息。
其实有输出,你可以执行命令ls -la /tmp/output/
,然后你会看到几个名为“.part-xxx”的文件。
对于您的工作,您需要在配置中设置 execution.checkpointing.interval
并在文件系统连接器的 属性 中设置 sink.rolling-policy.rollover-interval
。
作业将如下所示:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)
""")
table_env.execute_sql("""
CREATE TABLE print (
id INT,
data STRING
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output',
'sink.rolling-policy.rollover-interval' = '10s'
)
""")
table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()