如何将 CSV 作为 Streaming Table Source 加载到 PyFlink 中?

How can you load a CSV into PyFlink as a Streaming Table Source?

我正在尝试设置一个简单的游乐场环境来使用 Flink Python Table API。我最终尝试编写的工作将以 Kafka 或 Kenesis 队列为基础,但这使得玩弄想法(和测试)变得非常困难。

我可以愉快地从 CSV 文件加载并以批处理模式处理它。但我无法让它在流媒体模式下工作。我如何在 StreamingExecutionEnvironment 中做类似的事情(主要是为了让我可以玩 windows)。

我知道我需要让系统使用 EventTime(因为 ProcTime 会同时进入),但我无法找到设置它的方法。原则上我应该能够将 CSV 的其中一列设置为事件时间,但文档中不清楚如何执行此操作(或者如果可能的话)。

为了获得批处理执行测试 运行 我使用了下面的代码,它从 input.csv 读取并输出到 output.csv.

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    TableConfig,
    DataTypes,
    BatchTableEnvironment,
    StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"

try:
    out_path.unlink()
except:
    pass

from pyflink.table.window import Tumble

(
    t_env.connect(FileSystem().path(str(root / "input.csv")))
    .with_format(Csv())
    .with_schema(
        Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
    )
    .create_temporary_table("mySource")
)

(
    t_env.connect(FileSystem().path(str(out_path)))
    .with_format(Csv())
    .with_schema(
        Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
    )
    .create_temporary_table("mySink")
)

(
    t_env.from_path("mySource")
    .group_by("word")
    .select("word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

t_env.execute("tutorial_job")

并且input.csv是

2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve

所以我的问题是我如何设置它以便它从同一个 CSV 中读取,但使用第一列作为事件时间并允许我编写如下代码:

(
    t_env.from_path("mySource")
    .window(Tumble.over("10.minutes").on("time").alias("w"))
    .group_by("w, word")
    .select("w, word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

任何帮助将不胜感激,我无法从文档中解决这个问题。我正在使用 python 3.7flink 1.11.1 .

您尝试过使用水印策略吗?如前所述 here, you need to use watermark strategies to use event time. For pyflink case, personally i think it is easier to declare it in the ddl format like this.

如果你使用描述符API,你可以通过模式指定一个字段是event-time字段:

.with_schema(  # declare the schema of the table
             Schema()
             .field("rowtime", DataTypes.TIMESTAMP())
             .rowtime(
                Rowtime()
                .timestamps_from_field("time")
                .watermarks_periodic_bounded(60000))
             .field("a", DataTypes.STRING())
             .field("b", DataTypes.STRING())
             .field("c", DataTypes.STRING())
         )

但是我还是推荐大家使用DDL,一方面更容易使用,另一方面现有的Descriptor存在一些bug API,社区正在讨论重构描述符 API