Flink SQL (V 1.12.1) 无法从 Kinesis 流中读取 debezium 变更日志
Flink SQL (V 1.12.1) unable to read debezium changelog from Kinesis stream
我在从 Kinesis 流读取 Debezium 更改日志时遇到了一些问题。我能否深入了解如何使用 Flink SQL.
解析更改日志事件
下面是我尝试通过 Flink SQL 客户端
解析流的尝试
Flink SQL> CREATE TABLE test_table (
> city_id INT,
> country_id INT,
> city STRING,
> last_update timestamp
> )
> WITH (
> 'connector' = 'kinesis',
> 'stream' = 'kinesis.sakila.city',
> 'aws.region' = 'us-east-1',
> 'scan.stream.initpos' = 'TRIM_HORIZON',
> 'format' = 'debezium-json'
> );
[INFO] Table has been created.
Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema
Flink文档中有一个table显示which connectors support each of the formats。您会在那里看到 Kinesis 连接器不支持 Debezium 变更日志格式。
我在从 Kinesis 流读取 Debezium 更改日志时遇到了一些问题。我能否深入了解如何使用 Flink SQL.
解析更改日志事件下面是我尝试通过 Flink SQL 客户端
解析流的尝试Flink SQL> CREATE TABLE test_table (
> city_id INT,
> country_id INT,
> city STRING,
> last_update timestamp
> )
> WITH (
> 'connector' = 'kinesis',
> 'stream' = 'kinesis.sakila.city',
> 'aws.region' = 'us-east-1',
> 'scan.stream.initpos' = 'TRIM_HORIZON',
> 'format' = 'debezium-json'
> );
[INFO] Table has been created.
Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema
Flink文档中有一个table显示which connectors support each of the formats。您会在那里看到 Kinesis 连接器不支持 Debezium 变更日志格式。