PyFlink SQL 本地测试

PyFlink SQL local test

所以我有一个用 PyFlink 写的简单聚合作业 SQL API。该作业从 AWS kinesis 读取数据并将结果输出到 Kinesis。

我很好奇我是否可以使用 pytest 对我的管道进行单元测试?我猜我需要用文件系统连接器模拟源和接收器?但是如何创建一个本地 Flink 会话来 运行 pytest 中的作业?我们这里有最佳实践推荐吗?

谢谢!

你应该看看 PyFlink 本身的测试是如何实现的。它为实施 table 测试用例建立了各种基础 类; PyFlinkStreamTableTestCase might a good place to start. Using this it's possible to write tests like this one that I've copied from here:

    def test_sql_query(self):
        t_env = self.t_env
        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
        field_names = ["a", "b", "c"]
        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
        t_env.register_table_sink(
            "sinks",
            source_sink_utils.TestAppendSink(field_names, field_types))

        result = t_env.sql_query("select a + 1, b, c from %s" % source)
        result.execute_insert("sinks").wait()
        actual = source_sink_utils.results()

        expected = ['+I[2, Hi, Hello]', '+I[3, Hello, Hello]']
        self.assert_equals(actual, expected)

那个测试的来源还有很多。