在创建它的同一应用程序中查询 KTable
Query KTable in the same Application where it is created
我有一个 Kafka 流应用程序,我在其中读取主题,进行聚合并在 KTable 中具体化。然后我创建一个 Stream 和 运行 流上的一些逻辑。现在在流处理中,我想使用前面提到的 KTable 中的一些数据。启动流应用程序后,如何再次访问 KTable 流?我不想把 KTable 推到新的 Topic。
KStream<String, MyClass> source = builder.stream("my-topic");
KTable<Windowed<String>, Long> kTable =
source.groupBy((key, value) -> value.getKey(),
Grouped.<String, MyClass >as("repartition-1")
.withKeySerde(new Serdes.String())
.withValueSerde(new MyClassSerDes()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("test-store")
.withKeySerde(new Serdes.String())
.withValueSerde(Serdes.Long()));
这里我想使用kTable中的数据
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30)))
.toStream()
.filter((k, v) -> {
// Here get the count for the previous Window.
// Use that count for some computation here.
}
您可以将 KTable
商店添加到 processor/transformer。对于您的情况,您可以将 filter
替换为 flatTransform
(或 transform
等任何兄弟,具体取决于您是否需要访问密钥)并将商店连接到运营商:
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30))
)
.toStream()
// requires v2.2; otherwise use `transform()`
// if you don't need access to the key, consider to use `flatTransformValues` (v2.3)
.flatTransform(
() -> new Transformer<Windowed<myKey>,
Long,
List<KeyValue<Windowed<myKey>, Long>>() {
private ReadOnlyWindowStore<myKey, Long> store;
public void init(final ProcessorContext context) {
// get a handle on the store by its name
// as specified via `Materialized` above;
// should be read-only
store = (ReadOnlyWindowStore<myKey, Long>)context.getStateStore("str");
}
public List<KeyValue<Windowed<myKey>, Long>> transform(Windowed<myKey> key,
Long value) {
// access `store` as you wish to make a filtering decision
if ( ... ) {
// record passes
return Collection.singletonList(KeyValue.pair(key, value));
} else {
// drop record
return Collection.emptyList();
}
}
public void close() {} // nothing to do
},
"str" // connect the KTable store to the transformer using its name
// as specified via `Materialized` above
);
我有一个 Kafka 流应用程序,我在其中读取主题,进行聚合并在 KTable 中具体化。然后我创建一个 Stream 和 运行 流上的一些逻辑。现在在流处理中,我想使用前面提到的 KTable 中的一些数据。启动流应用程序后,如何再次访问 KTable 流?我不想把 KTable 推到新的 Topic。
KStream<String, MyClass> source = builder.stream("my-topic");
KTable<Windowed<String>, Long> kTable =
source.groupBy((key, value) -> value.getKey(),
Grouped.<String, MyClass >as("repartition-1")
.withKeySerde(new Serdes.String())
.withValueSerde(new MyClassSerDes()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("test-store")
.withKeySerde(new Serdes.String())
.withValueSerde(Serdes.Long()));
这里我想使用kTable中的数据
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30)))
.toStream()
.filter((k, v) -> {
// Here get the count for the previous Window.
// Use that count for some computation here.
}
您可以将 KTable
商店添加到 processor/transformer。对于您的情况,您可以将 filter
替换为 flatTransform
(或 transform
等任何兄弟,具体取决于您是否需要访问密钥)并将商店连接到运营商:
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30))
)
.toStream()
// requires v2.2; otherwise use `transform()`
// if you don't need access to the key, consider to use `flatTransformValues` (v2.3)
.flatTransform(
() -> new Transformer<Windowed<myKey>,
Long,
List<KeyValue<Windowed<myKey>, Long>>() {
private ReadOnlyWindowStore<myKey, Long> store;
public void init(final ProcessorContext context) {
// get a handle on the store by its name
// as specified via `Materialized` above;
// should be read-only
store = (ReadOnlyWindowStore<myKey, Long>)context.getStateStore("str");
}
public List<KeyValue<Windowed<myKey>, Long>> transform(Windowed<myKey> key,
Long value) {
// access `store` as you wish to make a filtering decision
if ( ... ) {
// record passes
return Collection.singletonList(KeyValue.pair(key, value));
} else {
// drop record
return Collection.emptyList();
}
}
public void close() {} // nothing to do
},
"str" // connect the KTable store to the transformer using its name
// as specified via `Materialized` above
);