Flink SQL CSV 连续流式传输

Flink SQL CSV Streaming Continuously

我正在创建 2 个 flink sql tables,1 个用于 CSV 文件系统,其他用于 kafka。目标是持续监控文件系统文件夹并将新的 csv 文件记录推送到 kafka 主题。但是我在下面编写的查询将 csv 文件记录推送一次,并且 flink 作业进入“已完成”模式,并且不会处理来自该文件夹的任何新文件。有人可以告诉我如何使用源和 csv 文件系统创建 flink sql 连续流,并将目标作为 Kafka。

Flink SQL创建Source-table

CREATE TABLE son_hsb_source_filesystem_csv_bulk(
    file_name STRING,
    start_time STRING,
    oss_cell_id BIGINT,
    enodeb STRING,
    dl_payload FLOAT,
    rrc_conn_den BIGINT,
    rrc_conn_num BIGINT,
    pm_array_1 STRING
) WITH (
    'connector' = 'filesystem', --Don't Change this
    'path' = 'file:///opt/kafka-python-exec/files/' , -- Change file name alone
    'format' = 'csv', --Don't Change this
    'format.ignore-parse-errors' = 'true', --Don't Change this
    'csv.ignore-parse-errors' = 'true', --Don't Change this
    'csv.allow-comments' = 'true' --Don't Change this

Flink SQL 创建目标 table

CREATE TABLE son_hsb_target_kafka_9092_filesystem_bulk_tests(
    file_name STRING,
    start_time STRING,
    oss_cell_id BIGINT,
    enodeb STRING,
    dl_payload FLOAT,
    rrc_conn_den BIGINT,
    rrc_conn_num BIGINT,
    pm_array_1 STRING
) WITH (
    'connector' = 'kafka',  --Don't Change this
    'topic' = 'son_hsb_target_kafka_9092_fs_bulk_data_tests',  -- Add any topic name you want
    'scan.startup.mode' = 'earliest-offset',  --Don't Change this
    'properties.bootstrap.servers' = 'localhost:9092', --Don't Change this
    'format' = 'json',  --Don't Change this
    'json.fail-on-missing-field' = 'false', --Don't Change this
    'json.ignore-parse-errors' = 'true' --Don't Change this

Flink SQL 创建一个 Streaming Job # 这会运行一次并进入 Finished 模式

INSERT INTO son_hsb_target_kafka_9092_filesystem_bulk_tests
SELECT file_name,start_time,oss_cell_id,enodeb,dl_payload,rrc_conn_den,rrc_conn_num,pm_array_1 FROM son_hsb_source_filesystem_csv_bulk

如何定义一个始终保持在“运行”状态并寻找新文件的 Streaming 作业。请提出建议。


File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.