在 Spring Boot 中实现 KStream / Table 的困惑
Confusion on implementing KStream / Table in Spring Boot
我正在尝试获取一些 Spring-Boot kafka 流 "action" 工作的样本,但我似乎完全搞糊涂了:)
我正在通过网络接收 JSON 数据。我在 avro 中构建了一个模式,用于序列化数据:
{
"UID": "XJ3_112",
"type": "11X",
"state": "PLATFORM_INITIALIZED",
"fuelremaining": 0,
"latitude": 50.1232,
"longitude": -119.257,
"altitude": 0,
"time": "2018-07-18T00:00:13.9966Z"
}
{
"platformUID": "BSG_SS_1_4",
"type": "OB_334_11",
"state": "ON_STATION",
"fuelremaining": -1,
"latitude": 56.1623,
"longitude": -44.5614,
"altitude": 519174,
"time": "2018-07-18T00:01:43.0871Z"
}
据我所知:
@Component
class KStreamTransformer {
@Autowired
private lateinit var objectMapper: ObjectMapper
@StreamListener(MyKafkaStreams.INPUT)
@SendTo(MyKafkaStreams.OUTPUT)
fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {
return input.flatMapValues{
value ->
val out = Arrays.asList(value)
out
}.groupBy() ???
}
}
我希望创建一个如下所示的 KTable:
|platformUID|state|Lat|Lon|Alt|
|------------|-----|---|---|---|
这就是我感到困惑的地方。
我假设我想在 PlatformUID
字段上做一个 GroupBy
,但我不清楚如何实际进行。
有人能给我指出正确的方向吗?
我想我正在寻找的是获取 input
流并将其转换为 KTable,键为 value.getUID()
,值为之前的值 [=18] =]
如果platformUID
已经是数据生产者使用的key,可以使用
KTable ktable = kstream
.groupByKey()
.reduce((oldValue, newValue) -> newValue)
如果没有,应该在.groupBy()
中放一个KeyValueMapper,看起来像
KTable ktable = kstream
.groupBy((k, v) -> v.getPlatformUID())
.reduce((oldValue, newValue) -> newValue)
它将创建一个内部主题,使用新密钥对源主题重新分区。
对于java7,KeyValueMapper的语法如下:
KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
public K apply(K key, V1 value) {
return key;
}
};
我正在尝试获取一些 Spring-Boot kafka 流 "action" 工作的样本,但我似乎完全搞糊涂了:)
我正在通过网络接收 JSON 数据。我在 avro 中构建了一个模式,用于序列化数据:
{
"UID": "XJ3_112",
"type": "11X",
"state": "PLATFORM_INITIALIZED",
"fuelremaining": 0,
"latitude": 50.1232,
"longitude": -119.257,
"altitude": 0,
"time": "2018-07-18T00:00:13.9966Z"
}
{
"platformUID": "BSG_SS_1_4",
"type": "OB_334_11",
"state": "ON_STATION",
"fuelremaining": -1,
"latitude": 56.1623,
"longitude": -44.5614,
"altitude": 519174,
"time": "2018-07-18T00:01:43.0871Z"
}
据我所知:
@Component
class KStreamTransformer {
@Autowired
private lateinit var objectMapper: ObjectMapper
@StreamListener(MyKafkaStreams.INPUT)
@SendTo(MyKafkaStreams.OUTPUT)
fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {
return input.flatMapValues{
value ->
val out = Arrays.asList(value)
out
}.groupBy() ???
}
}
我希望创建一个如下所示的 KTable:
|platformUID|state|Lat|Lon|Alt| |------------|-----|---|---|---|
这就是我感到困惑的地方。
我假设我想在 PlatformUID
字段上做一个 GroupBy
,但我不清楚如何实际进行。
有人能给我指出正确的方向吗?
我想我正在寻找的是获取 input
流并将其转换为 KTable,键为 value.getUID()
,值为之前的值 [=18] =]
如果platformUID
已经是数据生产者使用的key,可以使用
KTable ktable = kstream
.groupByKey()
.reduce((oldValue, newValue) -> newValue)
如果没有,应该在.groupBy()
中放一个KeyValueMapper,看起来像
KTable ktable = kstream
.groupBy((k, v) -> v.getPlatformUID())
.reduce((oldValue, newValue) -> newValue)
它将创建一个内部主题,使用新密钥对源主题重新分区。
对于java7,KeyValueMapper的语法如下:
KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
public K apply(K key, V1 value) {
return key;
}
};