kafka streams DSL:添加一个选项参数以在使用 `map` `selectByKey` `groupBy` 时禁用重新分区
kafka streams DSL: add an option parameter to disable repartition when using `map` `selectByKey` `groupBy`
根据文档,应用时流将被标记为重新分区 map
selectKey
groupBy
即使新密钥已被适当分区。是否可以添加一个选项参数来禁用重新分区?
这是我的用户案例:
有一个主题已按 user_id
.
划分
# topic 'user', format '%key,%value'
partition-1:
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device2'}
partition-2:
user2,{'user_id':'user2', 'device_id':'device3'}
user2,{'user_id':'user2', 'device_id':'device4'}
我想使用 DSL 计算 user_id-device_id 对,如下所示:
stream
.groupBy((user_id, value) -> {
JSONObject event = new JSONObject(value);
String userId = event.getString('user_id');
String deviceId = event.getString('device_id');
return String.format("%s&%s", userId,deviceId);
})
.count();
实际上新密钥已经被间接分区了。没有必要再做一次。
如果您使用.groupBy()
,它总是会导致数据重新分区。如果可能,请改用 groupByKey,它只会在需要时重新分区数据。
在您的情况下,您无论如何都在更改密钥,因此这将创建一个重新分区的主题。
根据文档,应用时流将被标记为重新分区 map
selectKey
groupBy
即使新密钥已被适当分区。是否可以添加一个选项参数来禁用重新分区?
这是我的用户案例:
有一个主题已按 user_id
.
# topic 'user', format '%key,%value'
partition-1:
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device2'}
partition-2:
user2,{'user_id':'user2', 'device_id':'device3'}
user2,{'user_id':'user2', 'device_id':'device4'}
我想使用 DSL 计算 user_id-device_id 对,如下所示:
stream
.groupBy((user_id, value) -> {
JSONObject event = new JSONObject(value);
String userId = event.getString('user_id');
String deviceId = event.getString('device_id');
return String.format("%s&%s", userId,deviceId);
})
.count();
实际上新密钥已经被间接分区了。没有必要再做一次。
如果您使用.groupBy()
,它总是会导致数据重新分区。如果可能,请改用 groupByKey,它只会在需要时重新分区数据。
在您的情况下,您无论如何都在更改密钥,因此这将创建一个重新分区的主题。