Flink SQL CSV 连续流式传输
Flink SQL CSV Streaming Continuously
我正在创建 2 个 flink sql tables,1 个用于 CSV 文件系统,其他用于 kafka。目标是持续监控文件系统文件夹并将新的 csv 文件记录推送到 kafka 主题。但是我在下面编写的查询将 csv 文件记录推送一次,并且 flink 作业进入“已完成”模式,并且不会处理来自该文件夹的任何新文件。有人可以告诉我如何使用源和 csv 文件系统创建 flink sql 连续流,并将目标作为 Kafka。
Flink SQL创建Source-table
CREATE TABLE son_hsb_source_filesystem_csv_bulk(
file_name STRING,
start_time STRING,
oss_cell_id BIGINT,
enodeb STRING,
dl_payload FLOAT,
rrc_conn_den BIGINT,
rrc_conn_num BIGINT,
pm_array_1 STRING
) WITH (
'connector' = 'filesystem', --Don't Change this
'path' = 'file:///opt/kafka-python-exec/files/' , -- Change file name alone
'format' = 'csv', --Don't Change this
'format.ignore-parse-errors' = 'true', --Don't Change this
'csv.ignore-parse-errors' = 'true', --Don't Change this
'csv.allow-comments' = 'true' --Don't Change this
);
Flink SQL 创建目标 table
CREATE TABLE son_hsb_target_kafka_9092_filesystem_bulk_tests(
file_name STRING,
start_time STRING,
oss_cell_id BIGINT,
enodeb STRING,
dl_payload FLOAT,
rrc_conn_den BIGINT,
rrc_conn_num BIGINT,
pm_array_1 STRING
) WITH (
'connector' = 'kafka', --Don't Change this
'topic' = 'son_hsb_target_kafka_9092_fs_bulk_data_tests', -- Add any topic name you want
'scan.startup.mode' = 'earliest-offset', --Don't Change this
'properties.bootstrap.servers' = 'localhost:9092', --Don't Change this
'format' = 'json', --Don't Change this
'json.fail-on-missing-field' = 'false', --Don't Change this
'json.ignore-parse-errors' = 'true' --Don't Change this
);
Flink SQL 创建一个 Streaming Job # 这会运行一次并进入 Finished 模式
INSERT INTO son_hsb_target_kafka_9092_filesystem_bulk_tests
SELECT file_name,start_time,oss_cell_id,enodeb,dl_payload,rrc_conn_den,rrc_conn_num,pm_array_1 FROM son_hsb_source_filesystem_csv_bulk
如何定义一个始终保持在“运行”状态并寻找新文件的 Streaming 作业。请提出建议。
文档表明尚未为流式文件系统源实现此功能:
File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.
我正在创建 2 个 flink sql tables,1 个用于 CSV 文件系统,其他用于 kafka。目标是持续监控文件系统文件夹并将新的 csv 文件记录推送到 kafka 主题。但是我在下面编写的查询将 csv 文件记录推送一次,并且 flink 作业进入“已完成”模式,并且不会处理来自该文件夹的任何新文件。有人可以告诉我如何使用源和 csv 文件系统创建 flink sql 连续流,并将目标作为 Kafka。
Flink SQL创建Source-table
CREATE TABLE son_hsb_source_filesystem_csv_bulk(
file_name STRING,
start_time STRING,
oss_cell_id BIGINT,
enodeb STRING,
dl_payload FLOAT,
rrc_conn_den BIGINT,
rrc_conn_num BIGINT,
pm_array_1 STRING
) WITH (
'connector' = 'filesystem', --Don't Change this
'path' = 'file:///opt/kafka-python-exec/files/' , -- Change file name alone
'format' = 'csv', --Don't Change this
'format.ignore-parse-errors' = 'true', --Don't Change this
'csv.ignore-parse-errors' = 'true', --Don't Change this
'csv.allow-comments' = 'true' --Don't Change this
);
Flink SQL 创建目标 table
CREATE TABLE son_hsb_target_kafka_9092_filesystem_bulk_tests(
file_name STRING,
start_time STRING,
oss_cell_id BIGINT,
enodeb STRING,
dl_payload FLOAT,
rrc_conn_den BIGINT,
rrc_conn_num BIGINT,
pm_array_1 STRING
) WITH (
'connector' = 'kafka', --Don't Change this
'topic' = 'son_hsb_target_kafka_9092_fs_bulk_data_tests', -- Add any topic name you want
'scan.startup.mode' = 'earliest-offset', --Don't Change this
'properties.bootstrap.servers' = 'localhost:9092', --Don't Change this
'format' = 'json', --Don't Change this
'json.fail-on-missing-field' = 'false', --Don't Change this
'json.ignore-parse-errors' = 'true' --Don't Change this
);
Flink SQL 创建一个 Streaming Job # 这会运行一次并进入 Finished 模式
INSERT INTO son_hsb_target_kafka_9092_filesystem_bulk_tests
SELECT file_name,start_time,oss_cell_id,enodeb,dl_payload,rrc_conn_den,rrc_conn_num,pm_array_1 FROM son_hsb_source_filesystem_csv_bulk
如何定义一个始终保持在“运行”状态并寻找新文件的 Streaming 作业。请提出建议。
文档表明尚未为流式文件系统源实现此功能:
File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.