PyFlink 如何读取 Table API 的多目录?
How to read multi directories with Table API in PyFlink?
我想在 PyFlink 中用 Table API 读取多个目录,
from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
if __name__ == 'main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_execution_environment=env)
table_env \
.get_config() \
.get_configuration() \
.set_string("default.parallelism", "1")
ddl = """
CREATE TABLE test (
a INT,
b STRING
) WITH (
'connector' = 'filesystem',
'path' = '{path}',
'format' = 'csv',
'csv.ignore-first-line' = 'true',
'csv.ignore-parse-errors' = 'true',
'csv.array-element-delimiter' = ';'
)
""".format(path='/opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16')
table_env.execute_sql(ddl)
但因以下错误而失败:
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16 does not exist or the user running Flink ('root') has insufficient permissions to access it.
我确定这三个目录存在并且我有权访问它:
/opt/data/day=2021-11-14,
/opt/data/day=2021-11-15,
/opt/data/day=2021-11-16
如果无法读取多目录,我必须创建三个表,然后合并它们,这会更冗长。
如有任何建议,我们将不胜感激。谢谢
正在使用
'path' = '/opt/data'
应该够用了。文件系统连接器还能够读取 partition field 并基于它执行过滤。例如,您可以使用此模式定义 table:
CREATE TABLE test (
a INT,
b STRING,
day DATE
) PARTITIONED BY (day) WITH (
'connector' = 'filesystem',
'path' = '/opt/data',
[...]
)
然后是下面的查询:
SELECT * FROM test WHERE day = '2021-11-14'
将只读取文件 /opt/data/day=2021-11-14
我想在 PyFlink 中用 Table API 读取多个目录,
from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
if __name__ == 'main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_execution_environment=env)
table_env \
.get_config() \
.get_configuration() \
.set_string("default.parallelism", "1")
ddl = """
CREATE TABLE test (
a INT,
b STRING
) WITH (
'connector' = 'filesystem',
'path' = '{path}',
'format' = 'csv',
'csv.ignore-first-line' = 'true',
'csv.ignore-parse-errors' = 'true',
'csv.array-element-delimiter' = ';'
)
""".format(path='/opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16')
table_env.execute_sql(ddl)
但因以下错误而失败:
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /opt/data/day=2021-11-14,/opt/data/day=2021-11-15,/opt/data/day=2021-11-16 does not exist or the user running Flink ('root') has insufficient permissions to access it.
我确定这三个目录存在并且我有权访问它:
/opt/data/day=2021-11-14,
/opt/data/day=2021-11-15,
/opt/data/day=2021-11-16
如果无法读取多目录,我必须创建三个表,然后合并它们,这会更冗长。
如有任何建议,我们将不胜感激。谢谢
正在使用
'path' = '/opt/data'
应该够用了。文件系统连接器还能够读取 partition field 并基于它执行过滤。例如,您可以使用此模式定义 table:
CREATE TABLE test (
a INT,
b STRING,
day DATE
) PARTITIONED BY (day) WITH (
'connector' = 'filesystem',
'path' = '/opt/data',
[...]
)
然后是下面的查询:
SELECT * FROM test WHERE day = '2021-11-14'
将只读取文件 /opt/data/day=2021-11-14