为什么在Kafka Connect中实际上需要时间戳+递增模式?
Why timestamp+incrementing mode is actually required in Kafka Connect?
我正在阅读有关 Kafka 连接(JDBC 来源)的文章,但我无法理解 timestamp+incrementing
模式的实际需求。如果 timestamp
模式可以获得 updates/inserts 一旦我们有一个 updatedAt
时间戳列,它可能不一定是唯一的,但它总是随着 updates/inserts.
增加
我在某处读到 timestamp+incrementing
为流式传输提供了一个全球唯一的 ID,但我不确定如果我将数据转储到 Kafka,然后从 Kafka 转储到 s3,将如何使用该 ID。我还读到,由于某些 early shutdown
程序,我们需要 incrementing
列,但我再次不确定这是否是一些内部 kafka 连接问题。
这是我在github上找到的用于实现时间戳+递增的代码。
protected void timestampIncrementingWhereClause(ExpressionBuilder builder) {
// This version combines two possible conditions. The first checks timestamp == last
// timestamp and incrementing > last incrementing. The timestamp alone would include
// duplicates, but adding the incrementing condition ensures no duplicates, e.g. you would
// get only the row with id = 23:
// timestamp 1234, id 22 <- last
// timestamp 1234, id 23
// The second check only uses the timestamp > last timestamp. This covers everything new,
// even if it is an update of the existing row. If we previously had:
// timestamp 1234, id 22 <- last
// and then these rows were written:
// timestamp 1235, id 22
// timestamp 1236, id 23
// We should capture both id = 22 (an update) and id = 23 (a new row)
builder.append(" WHERE ");
coalesceTimestampColumns(builder);
builder.append(" < ? AND ((");
coalesceTimestampColumns(builder);
builder.append(" = ? AND ");
builder.append(incrementingColumn);
builder.append(" > ?");
builder.append(") OR ");
coalesceTimestampColumns(builder);
builder.append(" > ?)");
builder.append(" ORDER BY ");
coalesceTimestampColumns(builder);
builder.append(",");
builder.append(incrementingColumn);
builder.append(" ASC");
}
提前感谢您的帮助!
mode=timestamp+incrementing
不是“必需的”。还有其他选择。
read somewhere that...
The documentation perhaps? 不需要此模式的地方。
如您所见,它被认为是一个唯一的 ID,可以防止在重新启动时读取相同的行。它有助于确定是否存在重复的时间戳,哪些可能已被读取。
S3 与问题或 JDBC 源连接器的工作原理无关
我正在阅读有关 Kafka 连接(JDBC 来源)的文章,但我无法理解 timestamp+incrementing
模式的实际需求。如果 timestamp
模式可以获得 updates/inserts 一旦我们有一个 updatedAt
时间戳列,它可能不一定是唯一的,但它总是随着 updates/inserts.
我在某处读到 timestamp+incrementing
为流式传输提供了一个全球唯一的 ID,但我不确定如果我将数据转储到 Kafka,然后从 Kafka 转储到 s3,将如何使用该 ID。我还读到,由于某些 early shutdown
程序,我们需要 incrementing
列,但我再次不确定这是否是一些内部 kafka 连接问题。
这是我在github上找到的用于实现时间戳+递增的代码。
protected void timestampIncrementingWhereClause(ExpressionBuilder builder) {
// This version combines two possible conditions. The first checks timestamp == last
// timestamp and incrementing > last incrementing. The timestamp alone would include
// duplicates, but adding the incrementing condition ensures no duplicates, e.g. you would
// get only the row with id = 23:
// timestamp 1234, id 22 <- last
// timestamp 1234, id 23
// The second check only uses the timestamp > last timestamp. This covers everything new,
// even if it is an update of the existing row. If we previously had:
// timestamp 1234, id 22 <- last
// and then these rows were written:
// timestamp 1235, id 22
// timestamp 1236, id 23
// We should capture both id = 22 (an update) and id = 23 (a new row)
builder.append(" WHERE ");
coalesceTimestampColumns(builder);
builder.append(" < ? AND ((");
coalesceTimestampColumns(builder);
builder.append(" = ? AND ");
builder.append(incrementingColumn);
builder.append(" > ?");
builder.append(") OR ");
coalesceTimestampColumns(builder);
builder.append(" > ?)");
builder.append(" ORDER BY ");
coalesceTimestampColumns(builder);
builder.append(",");
builder.append(incrementingColumn);
builder.append(" ASC");
}
提前感谢您的帮助!
mode=timestamp+incrementing
不是“必需的”。还有其他选择。
read somewhere that...
The documentation perhaps? 不需要此模式的地方。
如您所见,它被认为是一个唯一的 ID,可以防止在重新启动时读取相同的行。它有助于确定是否存在重复的时间戳,哪些可能已被读取。
S3 与问题或 JDBC 源连接器的工作原理无关