如何将 CSV 作为 Streaming Table Source 加载到 PyFlink 中?
How can you load a CSV into PyFlink as a Streaming Table Source?
我正在尝试设置一个简单的游乐场环境来使用 Flink Python Table API。我最终尝试编写的工作将以 Kafka 或 Kenesis 队列为基础,但这使得玩弄想法(和测试)变得非常困难。
我可以愉快地从 CSV 文件加载并以批处理模式处理它。但我无法让它在流媒体模式下工作。我如何在 StreamingExecutionEnvironment 中做类似的事情(主要是为了让我可以玩 windows)。
我知道我需要让系统使用 EventTime(因为 ProcTime 会同时进入),但我无法找到设置它的方法。原则上我应该能够将 CSV 的其中一列设置为事件时间,但文档中不清楚如何执行此操作(或者如果可能的话)。
为了获得批处理执行测试 运行 我使用了下面的代码,它从 input.csv
读取并输出到 output.csv
.
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
TableConfig,
DataTypes,
BatchTableEnvironment,
StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"
try:
out_path.unlink()
except:
pass
from pyflink.table.window import Tumble
(
t_env.connect(FileSystem().path(str(root / "input.csv")))
.with_format(Csv())
.with_schema(
Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
)
.create_temporary_table("mySource")
)
(
t_env.connect(FileSystem().path(str(out_path)))
.with_format(Csv())
.with_schema(
Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
)
.create_temporary_table("mySink")
)
(
t_env.from_path("mySource")
.group_by("word")
.select("word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
t_env.execute("tutorial_job")
并且input.csv是
2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve
所以我的问题是我如何设置它以便它从同一个 CSV 中读取,但使用第一列作为事件时间并允许我编写如下代码:
(
t_env.from_path("mySource")
.window(Tumble.over("10.minutes").on("time").alias("w"))
.group_by("w, word")
.select("w, word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
任何帮助将不胜感激,我无法从文档中解决这个问题。我正在使用 python 3.7
和 flink 1.11.1
.
您尝试过使用水印策略吗?如前所述 here, you need to use watermark strategies to use event time. For pyflink case, personally i think it is easier to declare it in the ddl format like this.
如果你使用描述符API,你可以通过模式指定一个字段是event-time字段:
.with_schema( # declare the schema of the table
Schema()
.field("rowtime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("time")
.watermarks_periodic_bounded(60000))
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
)
但是我还是推荐大家使用DDL,一方面更容易使用,另一方面现有的Descriptor存在一些bug API,社区正在讨论重构描述符 API
我正在尝试设置一个简单的游乐场环境来使用 Flink Python Table API。我最终尝试编写的工作将以 Kafka 或 Kenesis 队列为基础,但这使得玩弄想法(和测试)变得非常困难。
我可以愉快地从 CSV 文件加载并以批处理模式处理它。但我无法让它在流媒体模式下工作。我如何在 StreamingExecutionEnvironment 中做类似的事情(主要是为了让我可以玩 windows)。
我知道我需要让系统使用 EventTime(因为 ProcTime 会同时进入),但我无法找到设置它的方法。原则上我应该能够将 CSV 的其中一列设置为事件时间,但文档中不清楚如何执行此操作(或者如果可能的话)。
为了获得批处理执行测试 运行 我使用了下面的代码,它从 input.csv
读取并输出到 output.csv
.
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
TableConfig,
DataTypes,
BatchTableEnvironment,
StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"
try:
out_path.unlink()
except:
pass
from pyflink.table.window import Tumble
(
t_env.connect(FileSystem().path(str(root / "input.csv")))
.with_format(Csv())
.with_schema(
Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
)
.create_temporary_table("mySource")
)
(
t_env.connect(FileSystem().path(str(out_path)))
.with_format(Csv())
.with_schema(
Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
)
.create_temporary_table("mySink")
)
(
t_env.from_path("mySource")
.group_by("word")
.select("word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
t_env.execute("tutorial_job")
并且input.csv是
2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve
所以我的问题是我如何设置它以便它从同一个 CSV 中读取,但使用第一列作为事件时间并允许我编写如下代码:
(
t_env.from_path("mySource")
.window(Tumble.over("10.minutes").on("time").alias("w"))
.group_by("w, word")
.select("w, word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
任何帮助将不胜感激,我无法从文档中解决这个问题。我正在使用 python 3.7
和 flink 1.11.1
.
您尝试过使用水印策略吗?如前所述 here, you need to use watermark strategies to use event time. For pyflink case, personally i think it is easier to declare it in the ddl format like this.
如果你使用描述符API,你可以通过模式指定一个字段是event-time字段:
.with_schema( # declare the schema of the table
Schema()
.field("rowtime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("time")
.watermarks_periodic_bounded(60000))
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
)
但是我还是推荐大家使用DDL,一方面更容易使用,另一方面现有的Descriptor存在一些bug API,社区正在讨论重构描述符 API