如何在独立 Debezium 上使用带有发件箱转换的自定义有效负载列?
How to use custom payload column with outbox transform on standalone Debezium?
我正在尝试 运行 使用自定义负载列(after
)和附加 jsonb
列(before
)的带发件箱 SMT 的独立 Debezium ,但任务抛出错误:
debezium | 2019-05-21 23:07:50,267 ERROR || WorkerSourceTask{id=campaigns-outbox-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
debezium | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
debezium | at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
debezium | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
debezium | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
debezium | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
debezium | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
debezium | at java.lang.Thread.run(Thread.java:748)
debezium | Caused by: org.apache.kafka.connect.errors.DataException: payload is not a valid field name
debezium | at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
debezium | at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
debezium | at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:98)
debezium | at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply[=10=](TransformationChain.java:50)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
debezium | ... 11 more
在我看来,debezium 正试图从我的 table campaigns.outbox
中获取列 payload
,尽管我明确地覆盖了 transforms
有效负载列在我的工人参数中(./config/connect-campaigns-outbox.properties
):
name=campaigns-outbox-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=postgres
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=campaigns
database.server.name=campaigns_api
table.whitelist=campaigns.outbox
transforms=outbox
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.field.event.key=aggregate_id
transforms.outbox.table.field.event.type=type
transforms.outbox.table.field.event.payload.id=aggregate_id
transforms.outbox.table.field.event.payload=after
transforms.outbox.table.fields.additional.placement=before:envelope
transforms.outbox.route.topic.replacement=media-platform.campaigns-api.${routedByValue}
transforms.outbox.route.by.field=type
我的./docker-compose.yaml
:
services:
debezium:
container_name: debezium
image: debezium/connect:0.9
ports:
- 8082:8082
volumes:
- ./config:/kafka/config # can't use $KAFKA_HOME here
- ./offsets:/offsets
command:
- sh
- -c
- $$KAFKA_HOME/bin/connect-standalone.sh $$KAFKA_HOME/config/connect-standalone.properties $$KAFKA_HOME/config/connect-campaigns-outbox.properties
postgres:
container_name: postgres
image: 'debezium/postgres:10-alpine'
ports:
- 5432:5432
environment:
- POSTGRES_DB=campaigns
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 3s
retries: 7
zookeeper:
container_name: zookeeper
hostname: zookeeper
image: 'confluentinc/cp-zookeeper:3.1.1'
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
container_name: kafka
hostname: kafka
image: 'confluentinc/cp-kafka:3.1.1'
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
depends_on:
- zookeeper
ports:
- "9092:9092"
schema-registry:
container_name: schema-registry
hostname: schema-registry
image: 'confluentinc/cp-schema-registry:3.1.1'
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
depends_on:
- zookeeper
ports:
- "8081:8081"
我的./config/connect-standalone.properties
:
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter.schema.registry.url=http://schema-registry:8081
offset.storage.file.filename=/offsets/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/kafka/connect
重现错误所需的最小 PG 架构:
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE campaigns.outbox_event_type AS ENUM (
'campaign',
'creative'
);
CREATE TABLE campaigns.outbox (
id UUID PRIMARY KEY,
type campaigns.outbox_event_type NOT NULL,
aggregate_id TEXT NOT NULL,
before JSONB,
after JSONB
);
重现错误:
insert into campaigns.outbox (id, type, aggregate_id, before, after) values (uuid_generate_v4(), 'campaign', '1', NULL, '{"id":1,"title":"teste","description":"teste description"}');
我是不是做错了什么,或者我应该将此问题报告给 Jboss 团队?
有效载荷列的选项名称应为transforms.outbox.table.field.payload
而不是transforms.outbox.table.field.event.payload
(参见EventRouterConfigDefinition中选项的定义。
我看到它在文档中的指示有所不同,我认为应该相应地调整代码。我会处理的。
我正在尝试 运行 使用自定义负载列(after
)和附加 jsonb
列(before
)的带发件箱 SMT 的独立 Debezium ,但任务抛出错误:
debezium | 2019-05-21 23:07:50,267 ERROR || WorkerSourceTask{id=campaigns-outbox-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
debezium | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
debezium | at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
debezium | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
debezium | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
debezium | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
debezium | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
debezium | at java.lang.Thread.run(Thread.java:748)
debezium | Caused by: org.apache.kafka.connect.errors.DataException: payload is not a valid field name
debezium | at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
debezium | at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
debezium | at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:98)
debezium | at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply[=10=](TransformationChain.java:50)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
debezium | ... 11 more
在我看来,debezium 正试图从我的 table campaigns.outbox
中获取列 payload
,尽管我明确地覆盖了 transforms
有效负载列在我的工人参数中(./config/connect-campaigns-outbox.properties
):
name=campaigns-outbox-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=postgres
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=campaigns
database.server.name=campaigns_api
table.whitelist=campaigns.outbox
transforms=outbox
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.field.event.key=aggregate_id
transforms.outbox.table.field.event.type=type
transforms.outbox.table.field.event.payload.id=aggregate_id
transforms.outbox.table.field.event.payload=after
transforms.outbox.table.fields.additional.placement=before:envelope
transforms.outbox.route.topic.replacement=media-platform.campaigns-api.${routedByValue}
transforms.outbox.route.by.field=type
我的./docker-compose.yaml
:
services:
debezium:
container_name: debezium
image: debezium/connect:0.9
ports:
- 8082:8082
volumes:
- ./config:/kafka/config # can't use $KAFKA_HOME here
- ./offsets:/offsets
command:
- sh
- -c
- $$KAFKA_HOME/bin/connect-standalone.sh $$KAFKA_HOME/config/connect-standalone.properties $$KAFKA_HOME/config/connect-campaigns-outbox.properties
postgres:
container_name: postgres
image: 'debezium/postgres:10-alpine'
ports:
- 5432:5432
environment:
- POSTGRES_DB=campaigns
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 3s
retries: 7
zookeeper:
container_name: zookeeper
hostname: zookeeper
image: 'confluentinc/cp-zookeeper:3.1.1'
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
container_name: kafka
hostname: kafka
image: 'confluentinc/cp-kafka:3.1.1'
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
depends_on:
- zookeeper
ports:
- "9092:9092"
schema-registry:
container_name: schema-registry
hostname: schema-registry
image: 'confluentinc/cp-schema-registry:3.1.1'
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
depends_on:
- zookeeper
ports:
- "8081:8081"
我的./config/connect-standalone.properties
:
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter.schema.registry.url=http://schema-registry:8081
offset.storage.file.filename=/offsets/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/kafka/connect
重现错误所需的最小 PG 架构:
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE campaigns.outbox_event_type AS ENUM (
'campaign',
'creative'
);
CREATE TABLE campaigns.outbox (
id UUID PRIMARY KEY,
type campaigns.outbox_event_type NOT NULL,
aggregate_id TEXT NOT NULL,
before JSONB,
after JSONB
);
重现错误:
insert into campaigns.outbox (id, type, aggregate_id, before, after) values (uuid_generate_v4(), 'campaign', '1', NULL, '{"id":1,"title":"teste","description":"teste description"}');
我是不是做错了什么,或者我应该将此问题报告给 Jboss 团队?
有效载荷列的选项名称应为transforms.outbox.table.field.payload
而不是transforms.outbox.table.field.event.payload
(参见EventRouterConfigDefinition中选项的定义。
我看到它在文档中的指示有所不同,我认为应该相应地调整代码。我会处理的。