JDBC Source Connector Error: Transaction was aborted. It was wounded by a higher priority transaction due to conflict on keys in range

JDBC Source Connector Error: Transaction was aborted. It was wounded by a higher priority transaction due to conflict on keys in range

我正在使用 JDBC 源连接器和 JDBC 驱动程序来从 Google Cloud Spanner 收集数据到 Kafka。 我在 table 上使用“时间戳+递增”模式。 table 的主键包括 2 列(order_item_id 和 order_id)。 我使用 order_item_id 作为递增列,使用名为“updated_time”的列作为时间戳列。

我在启动connector的时候,有时会出现如下错误,但最终还是能拿到数据。

ERROR Failed to run query for table TimestampIncrementingTableQuerier{table="order_item", query='null', 
topicPrefix='test_', incrementingColumn='order_item_id', timestampColumns=[updated_time]}: {} 
(io.confluent.connect.jdbc.source.JdbcSourceTask:404)
com.google.cloud.spanner.jdbc.JdbcSqlExceptionFactory$JdbcAbortedDueToConcurrentModificationException: 
The transaction was aborted and could not be retried due to a concurrent modification
...
Caused by: com.google.cloud.spanner.AbortedDueToConcurrentModificationException: 
The transaction was aborted and could not be retried due to a concurrent modification
...
Suppressed: com.google.cloud.spanner.connection.AbstractBaseUnitOfWork$SpannerAsyncExecutionException: 
Execution failed for statement: 
SELECT * FROM `order_item` WHERE `order_item`.`updated_time` < @p1 AND ((`order_item`.`updated_time` = @p2 AND `order_item`.`order_item_id` > @p3) OR `order_item`.`updated_time` > @p4) ORDER BY `order_item`.`updated_time`,`order_item`.`order_item_id` ASC
...
Caused by: com.google.cloud.spanner.AbortedException: ABORTED: io.grpc.StatusRuntimeException: 
ABORTED: Transaction was aborted. It was wounded by a higher priority transaction due to conflict on keys in range [[5587892845991837697,5587892845991837702], [5587892845991837697,5587892845991837702]), column adjust in table order_item.
retry_delay {
  nanos: 12974238
}
 - Statement: 'SELECT * FROM `order_item` WHERE `order_item`.`updated_time` < @p1 AND ((`order_item`.`updated_time` = @p2 AND `order_item`.`order_item_id` > @p3) OR `order_item`.`updated_time` > @p4) ORDER BY `order_item`.`updated_time`,`order_item`.`order_item_id` ASC'
...

我想知道在我的案例中这个错误是怎么发生的。顺便说一句,即使有错误,连接器仍然可以在最后收集数据。任何人都可以帮忙吗?非常感谢!

我不确定您的整个管道是如何设置的,但该错误表明您正在 read/write 事务中执行查询。 Cloud Spanner 上的任何 read/write 事务都可以被 Cloud Spanner 中止,并可能导致您看到的错误。

如果您的管道仅从 Cloud Spanner 读取,最好的办法是将 JDBC 连接设置为 只读 自动提交 模式。您可以直接在 JDBC 连接 URL 中执行此操作,方法是将 readonly=trueautocommit=true 属性添加到 URL.

示例:

jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-database;readonly=true;autocommit=true

也可能是您使用的框架在 JDBC 连接打开后更改了它。在那种情况下,您应该看看是否可以在框架中更改它。但是在这种情况下,基于上述示例更改 JDBC URL 可能就足够了。

背景资料:

如果在关闭自动提交的情况下打开 JDBC 连接并且连接处于 read/write 模式,则在执行查询时将自动启动 read/write 事务。所有后续查询也将使用相同的 read/write 事务,直到在连接上调用 commit()。这是在 Cloud Spanner 上读取大量数据效率最低的方法,因此应尽可能避免。它还将导致事务中止,因为读取操作将锁定它正在读取的数据。