如何在 debezium 中将 offset.commit.policy 设置为 AlwaysCommitOffsetPolicy?
How to set the offset.commit.policy to AlwaysCommitOffsetPolicy in debezium?
我创建了一个 Debezium Embedded 引擎来捕获 MySQL 变更数据。我想尽快提交偏移量。在代码中,创建的配置包括以下内容。
.with("offset.commit.policy",OffsetCommitPolicy.AlwaysCommitOffsetPolicy.class.getName())
运行 这个 returns, java.lang.NoSuchMethodException: io.debezium.embedded.spi.OffsetCommitPolicy$AlwaysCommitOffsetPolicy.<init>(io.debezium.config.Configuration)
但是,当我启动嵌入式引擎时,
.with("offset.commit.policy",OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
,嵌入式引擎工作正常。
请注意,class OffsetCommitPolicy.PeriodicCommitOffsetPolicy
构造函数包含配置参数,而 OffsetCommitPolicy.AlwaysCommitOffsetPolicy
不包含。
public PeriodicCommitOffsetPolicy(Configuration config) {
...
}
如何让 debezium 嵌入式引擎使用其 AlwaysCommitOffsetPolicy
?
感谢您的报告。这部分是错误(如果您能登录我们的 Jira,我们将不胜感激)。您可以通过调用专用方法嵌入式引擎构建器来解决此问题,例如`io.debezium.embedded.EmbeddedEngine.create().with(OffsetCommitPolicy.always())'
测试版本 1.4.0Final:
new EmbeddedEngine.BuilderImpl() // create builder
.using(config) // regular config
.using(OffsetCommitPolicy.always()) // explicit commit policy
.notifying(this::handleEvent) // even procesor
.build(); // and finally build!
我创建了一个 Debezium Embedded 引擎来捕获 MySQL 变更数据。我想尽快提交偏移量。在代码中,创建的配置包括以下内容。
.with("offset.commit.policy",OffsetCommitPolicy.AlwaysCommitOffsetPolicy.class.getName())
运行 这个 returns, java.lang.NoSuchMethodException: io.debezium.embedded.spi.OffsetCommitPolicy$AlwaysCommitOffsetPolicy.<init>(io.debezium.config.Configuration)
但是,当我启动嵌入式引擎时,
.with("offset.commit.policy",OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
,嵌入式引擎工作正常。
请注意,class OffsetCommitPolicy.PeriodicCommitOffsetPolicy
构造函数包含配置参数,而 OffsetCommitPolicy.AlwaysCommitOffsetPolicy
不包含。
public PeriodicCommitOffsetPolicy(Configuration config) {
...
}
如何让 debezium 嵌入式引擎使用其 AlwaysCommitOffsetPolicy
?
感谢您的报告。这部分是错误(如果您能登录我们的 Jira,我们将不胜感激)。您可以通过调用专用方法嵌入式引擎构建器来解决此问题,例如`io.debezium.embedded.EmbeddedEngine.create().with(OffsetCommitPolicy.always())'
测试版本 1.4.0Final:
new EmbeddedEngine.BuilderImpl() // create builder
.using(config) // regular config
.using(OffsetCommitPolicy.always()) // explicit commit policy
.notifying(this::handleEvent) // even procesor
.build(); // and finally build!