Flink SQL (V 1.12.1) 无法从 Kinesis 流中读取 debezium 变更日志

Flink SQL (V 1.12.1) unable to read debezium changelog from Kinesis stream

我在从 Kinesis 流读取 Debezium 更改日志时遇到了一些问题。我能否深入了解如何使用 Flink SQL.

解析更改日志事件

下面是我尝试通过 Flink SQL 客户端

解析流的尝试
Flink SQL> CREATE TABLE test_table (
>   city_id INT,
>   country_id INT,
>   city STRING,
>   last_update timestamp
> )
> WITH (
>   'connector' = 'kinesis',
>   'stream' = 'kinesis.sakila.city',
>   'aws.region' = 'us-east-1',
>   'scan.stream.initpos' = 'TRIM_HORIZON',
>   'format' = 'debezium-json'
> );
[INFO] Table has been created.

Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema

Flink文档中有一个table显示which connectors support each of the formats。您会在那里看到 Kinesis 连接器不支持 Debezium 变更日志格式。