如何加入两个 KTable 并将结果 ktable 写入状态存储
How to join two KTable and write the result ktable in state store
我有两个 KTable 对象:
KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
KTable<Long, byte[]> secondTable = builder.table("secondTopic",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));
之后我想加入这两个 tables:
firstTable.leftJoin(secondTable,
(leftValue, rightValue) -> {
try {
return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
)
所以我有两个 table,我将它们合并为一个 table,我希望将结果 table 按每个键存储在 kafka 状态存储中, 但我不知道该怎么做。
您可以通过在 leftJoin
上指定 Materialized
参数并指定状态存储的名称来强制实现本地存储。
firstTable.leftJoin(..., Materialized.as("my-store-name"));
我有两个 KTable 对象:
KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
KTable<Long, byte[]> secondTable = builder.table("secondTopic",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));
之后我想加入这两个 tables:
firstTable.leftJoin(secondTable,
(leftValue, rightValue) -> {
try {
return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
)
所以我有两个 table,我将它们合并为一个 table,我希望将结果 table 按每个键存储在 kafka 状态存储中, 但我不知道该怎么做。
您可以通过在 leftJoin
上指定 Materialized
参数并指定状态存储的名称来强制实现本地存储。
firstTable.leftJoin(..., Materialized.as("my-store-name"));