为什么在Kafka Connect中实际上需要时间戳+递增模式?

Why timestamp+incrementing mode is actually required in Kafka Connect?

我正在阅读有关 Kafka 连接(JDBC 来源)的文章,但我无法理解 timestamp+incrementing 模式的实际需求。如果 timestamp 模式可以获得 updates/inserts 一旦我们有一个 updatedAt 时间戳列,它可能不一定是唯一的,但它总是随着 updates/inserts.

增加

我在某处读到 timestamp+incrementing 为流式传输提供了一个全球唯一的 ID,但我不确定如果我将数据转储到 Kafka,然后从 Kafka 转储到 s3,将如何使用该 ID。我还读到,由于某些 early shutdown 程序,我们需要 incrementing 列,但我再次不确定这是否是一些内部 kafka 连接问题。

这是我在github上找到的用于实现时间戳+递增的代码。

 protected void timestampIncrementingWhereClause(ExpressionBuilder builder) {
// This version combines two possible conditions. The first checks timestamp == last
// timestamp and incrementing > last incrementing. The timestamp alone would include
// duplicates, but adding the incrementing condition ensures no duplicates, e.g. you would
// get only the row with id = 23:
//  timestamp 1234, id 22 <- last
//  timestamp 1234, id 23
// The second check only uses the timestamp > last timestamp. This covers everything new,
// even if it is an update of the existing row. If we previously had:
//  timestamp 1234, id 22 <- last
// and then these rows were written:
//  timestamp 1235, id 22
//  timestamp 1236, id 23
// We should capture both id = 22 (an update) and id = 23 (a new row)
builder.append(" WHERE ");
coalesceTimestampColumns(builder);
builder.append(" < ? AND ((");
coalesceTimestampColumns(builder);
builder.append(" = ? AND ");
builder.append(incrementingColumn);
builder.append(" > ?");
builder.append(") OR ");
coalesceTimestampColumns(builder);
builder.append(" > ?)");
builder.append(" ORDER BY ");
coalesceTimestampColumns(builder);
builder.append(",");
builder.append(incrementingColumn);
builder.append(" ASC");

}

提前感谢您的帮助!

mode=timestamp+incrementing 不是“必需的”。还有其他选择。

read somewhere that...

The documentation perhaps? 不需要此模式的地方。

如您所见,它被认为是一个唯一的 ID,可以防止在重新启动时读取相同的行。它有助于确定是否存在重复的时间戳,哪些可能已被读取。

S3 与问题或 JDBC 源连接器的工作原理无关