当主题有多个分区时,KTable-KTable 外键连接不生成所有消息
KTable-KTable foreign-key join not producing all messages when topics have more than one partition
请参阅下面的更新以了解可能的解决方法
我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这会按预期工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。
我们在启动应用程序之前使用多个分区配置测试了这一理论。使用 1 个分区,我们可以看到 100% 的消息。对于 2,我们看到一些消息(少于 50%)。 10 个,我们几乎看不到任何东西(少于 10%)。
因为我们正在加入,所以从主题 1 使用的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。消息似乎卡在了从 Ktables 的外键连接创建的“中间”主题中,但没有错误消息。
如有任何帮助,我们将不胜感激!
Service.java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
注意:由于 spring-cloud-stream
中包含的版本存在错误,我们排除了 org.apache.kafka 依赖项
application.yml
spring:
application:
name: app-name
stream:
bindings:
process-in-0:
destination: topic1
group: ${spring.application.name}
process-in-1:
destination: topic2
group: ${spring.application.name}
process-out-0:
destination: outputTopic
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BROKERS}
configuration:
commit.interval.ms: 1000
producer:
acks: all
retries: 20
default:
key:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
min-partition-count: 2
测试场景:
举个具体的例子,如果我向主题1发布以下3条消息:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
输出主题只会收到2条消息。
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
另外两个怎么了?似乎某些 key/value 对无法写入输出主题。重试这些“丢失”的消息也不起作用。
更新:
我能够通过将主题 1 作为 KStream 而不是 KTable 使用并在继续执行 KTable-KTable 连接之前调用 toTable()
来正常运行。我仍然不确定为什么我的原始解决方案不起作用,但希望此解决方法可以阐明实际问题。
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.map(...)
.toTable()
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
在加入主题上选择键可能会有所帮助。主题的分区配置应该相同。
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream().selectKey((key, value) -> key);
这是一个奇怪的问题,我从来没有听说过一些输出主题分区控制数据写入频率。但是我知道 toStream()
只在缓存满时才向下游写入数据,所以尝试设置 cache.max.bytes.buffering = 0
。
此外,KTable 仅保留每个键的最新记录,因此如果您对同一个键有多个值,则只会保留最新值并写入下游。
根据问题的描述,(左)KTable 输入主题中的数据似乎没有按其键正确分区。对于单个分区主题,那么,只有一个分区,所有数据都转到这个分区,并且连接结果完成。
但是,对于多分区的输入主题,您需要确保数据是按键分区的,否则具有相同键的两条记录可能会在不同的分区中结束,从而连接失败(因为连接是在每个分区的基础上完成的)。
请注意,即使外键连接不要求两个输入主题共同分区,仍然需要每个输入主题本身按其键分区!
如果您使用 map().toTable()
,您基本上会触发数据的内部重新分区,以确保数据按密钥进行分区,这可以解决问题。
我遇到了类似的问题。我有两个传入的 KStreams,我将其转换为 KTables,并执行了 KTable-KTable FK 连接。 Kafka 流完全没有产生任何记录,连接从未执行过。
重新分区 KStreams 对我不起作用。相反,我不得不手动将分区大小设置为 1。
这是一个无效示例:
注意我使用的是 Kotlin,还有一些 extension helper functions
fun enrichUsersData(
userDataStream: KStream<UserId, UserData>,
environmentDataStream: KStream<RealmId, EnvironmentMetaData>,
) {
// aggregate all users on a server into an aggregating DTO
val userDataTable: KTable<ServerId, AggregatedUserData> =
userDataStream
.groupBy { _: UserId, userData: UserData -> userData.serverId }
.aggregate({ AggregatedUserData }) { serverId: ServerId, userData: UserData, usersAggregate: AggregatedUserData ->
usersAggregate
.addUserData(userData)
.setServerId(serverId)
return@aggregate usersAggregate
}
// convert all incoming environment data into a KTable
val environmentDataTable: KTable<RealmId, EnvironmentMetaData> =
environmentDataStream
.toTable()
// Now, try to enrich the user's data with the environment data
// the KTable-KTable FK join is correctly configured, but...
val enrichedUsersData: KTable<ServerId, AggregatedUserData> =
userDataTable.join(
other = environmentDataTable,
tableJoined = tableJoined("enrich-user-data.join"),
materialized = materializedAs(
"enriched-user-data.store",
jsonMapper.serde(),
jsonMapper.serde(),
),
foreignKeyExtractor = { usersData: AggregatedUserData -> usersData.realmId },
) { usersData: AggregatedUserData, environmentData: EnvironmentMetaData ->
usersData.enrichUserData(environmentData)
// this join is never called!!
return@join usersData
}
}
如果我手动将分区大小设置为 1,则可以。
fun enrichUsersData(
userDataStream: KStream<UserId, UserData>,
environmentDataStream: KStream<RealmId, EnvironmentMetaData>,
) {
// manually set the partition size to 1 *before* creating the table
val userDataTable: KTable<ServerId, AggregatedUserData> =
userDataStream
.repartition(
repartitionedAs(
"user-data.pre-table-repartition",
jsonMapper.serde(),
jsonMapper.serde(),
numberOfPartitions = 1,
)
)
.groupBy { _: UserId, userData: UserData -> userData.serverId }
.aggregate({ AggregatedUserData }) { serverId: ServerId, userData: UserData, usersAggregate: AggregatedUserData ->
usersAggregate
.addUserData(userData)
.setServerId(serverId)
return@aggregate usersAggregate
}
// again, manually set the partition size to 1 *before* creating the table
val environmentDataTable: KTable<RealmId, EnvironmentMetaData> =
environmentDataStream
.repartition(
repartitionedAs(
"environment-metadata.pre-table-repartition",
jsonMapper.serde(),
jsonMapper.serde(),
numberOfPartitions = 1,
)
)
.toTable()
// this join now works as expected!
val enrichedUsersData: KTable<ServerId, AggregatedUserData> =
userDataTable.join(
other = environmentDataTable,
tableJoined = tableJoined("enrich-user-data.join"),
materialized = materializedAs(
"enriched-user-data.store",
jsonMapper.serde(),
jsonMapper.serde(),
),
foreignKeyExtractor = { usersData: AggregatedUserData -> usersData.realmId },
) { usersData: AggregatedUserData, environmentData: EnvironmentMetaData ->
usersData.enrichUserData(environmentData)
return@join usersData
}
}
请参阅下面的更新以了解可能的解决方法
我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这会按预期工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。
我们在启动应用程序之前使用多个分区配置测试了这一理论。使用 1 个分区,我们可以看到 100% 的消息。对于 2,我们看到一些消息(少于 50%)。 10 个,我们几乎看不到任何东西(少于 10%)。
因为我们正在加入,所以从主题 1 使用的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。消息似乎卡在了从 Ktables 的外键连接创建的“中间”主题中,但没有错误消息。
如有任何帮助,我们将不胜感激!
Service.java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
注意:由于 spring-cloud-stream
中包含的版本存在错误,我们排除了 org.apache.kafka 依赖项application.yml
spring:
application:
name: app-name
stream:
bindings:
process-in-0:
destination: topic1
group: ${spring.application.name}
process-in-1:
destination: topic2
group: ${spring.application.name}
process-out-0:
destination: outputTopic
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BROKERS}
configuration:
commit.interval.ms: 1000
producer:
acks: all
retries: 20
default:
key:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
min-partition-count: 2
测试场景:
举个具体的例子,如果我向主题1发布以下3条消息:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
输出主题只会收到2条消息。
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
另外两个怎么了?似乎某些 key/value 对无法写入输出主题。重试这些“丢失”的消息也不起作用。
更新:
我能够通过将主题 1 作为 KStream 而不是 KTable 使用并在继续执行 KTable-KTable 连接之前调用 toTable()
来正常运行。我仍然不确定为什么我的原始解决方案不起作用,但希望此解决方法可以阐明实际问题。
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.map(...)
.toTable()
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
在加入主题上选择键可能会有所帮助。主题的分区配置应该相同。
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream().selectKey((key, value) -> key);
这是一个奇怪的问题,我从来没有听说过一些输出主题分区控制数据写入频率。但是我知道 toStream()
只在缓存满时才向下游写入数据,所以尝试设置 cache.max.bytes.buffering = 0
。
此外,KTable 仅保留每个键的最新记录,因此如果您对同一个键有多个值,则只会保留最新值并写入下游。
根据问题的描述,(左)KTable 输入主题中的数据似乎没有按其键正确分区。对于单个分区主题,那么,只有一个分区,所有数据都转到这个分区,并且连接结果完成。
但是,对于多分区的输入主题,您需要确保数据是按键分区的,否则具有相同键的两条记录可能会在不同的分区中结束,从而连接失败(因为连接是在每个分区的基础上完成的)。
请注意,即使外键连接不要求两个输入主题共同分区,仍然需要每个输入主题本身按其键分区!
如果您使用 map().toTable()
,您基本上会触发数据的内部重新分区,以确保数据按密钥进行分区,这可以解决问题。
我遇到了类似的问题。我有两个传入的 KStreams,我将其转换为 KTables,并执行了 KTable-KTable FK 连接。 Kafka 流完全没有产生任何记录,连接从未执行过。
重新分区 KStreams 对我不起作用。相反,我不得不手动将分区大小设置为 1。
这是一个无效示例:
注意我使用的是 Kotlin,还有一些 extension helper functions
fun enrichUsersData(
userDataStream: KStream<UserId, UserData>,
environmentDataStream: KStream<RealmId, EnvironmentMetaData>,
) {
// aggregate all users on a server into an aggregating DTO
val userDataTable: KTable<ServerId, AggregatedUserData> =
userDataStream
.groupBy { _: UserId, userData: UserData -> userData.serverId }
.aggregate({ AggregatedUserData }) { serverId: ServerId, userData: UserData, usersAggregate: AggregatedUserData ->
usersAggregate
.addUserData(userData)
.setServerId(serverId)
return@aggregate usersAggregate
}
// convert all incoming environment data into a KTable
val environmentDataTable: KTable<RealmId, EnvironmentMetaData> =
environmentDataStream
.toTable()
// Now, try to enrich the user's data with the environment data
// the KTable-KTable FK join is correctly configured, but...
val enrichedUsersData: KTable<ServerId, AggregatedUserData> =
userDataTable.join(
other = environmentDataTable,
tableJoined = tableJoined("enrich-user-data.join"),
materialized = materializedAs(
"enriched-user-data.store",
jsonMapper.serde(),
jsonMapper.serde(),
),
foreignKeyExtractor = { usersData: AggregatedUserData -> usersData.realmId },
) { usersData: AggregatedUserData, environmentData: EnvironmentMetaData ->
usersData.enrichUserData(environmentData)
// this join is never called!!
return@join usersData
}
}
如果我手动将分区大小设置为 1,则可以。
fun enrichUsersData(
userDataStream: KStream<UserId, UserData>,
environmentDataStream: KStream<RealmId, EnvironmentMetaData>,
) {
// manually set the partition size to 1 *before* creating the table
val userDataTable: KTable<ServerId, AggregatedUserData> =
userDataStream
.repartition(
repartitionedAs(
"user-data.pre-table-repartition",
jsonMapper.serde(),
jsonMapper.serde(),
numberOfPartitions = 1,
)
)
.groupBy { _: UserId, userData: UserData -> userData.serverId }
.aggregate({ AggregatedUserData }) { serverId: ServerId, userData: UserData, usersAggregate: AggregatedUserData ->
usersAggregate
.addUserData(userData)
.setServerId(serverId)
return@aggregate usersAggregate
}
// again, manually set the partition size to 1 *before* creating the table
val environmentDataTable: KTable<RealmId, EnvironmentMetaData> =
environmentDataStream
.repartition(
repartitionedAs(
"environment-metadata.pre-table-repartition",
jsonMapper.serde(),
jsonMapper.serde(),
numberOfPartitions = 1,
)
)
.toTable()
// this join now works as expected!
val enrichedUsersData: KTable<ServerId, AggregatedUserData> =
userDataTable.join(
other = environmentDataTable,
tableJoined = tableJoined("enrich-user-data.join"),
materialized = materializedAs(
"enriched-user-data.store",
jsonMapper.serde(),
jsonMapper.serde(),
),
foreignKeyExtractor = { usersData: AggregatedUserData -> usersData.realmId },
) { usersData: AggregatedUserData, environmentData: EnvironmentMetaData ->
usersData.enrichUserData(environmentData)
return@join usersData
}
}