使用 Kafka Streams 构建缓存
Building a cache with Kafka Streams
我正在尝试了解使用 Kafka Streams
时的可能性以及如何思考。
用例:
有个话题叫Transactions
:
- key -> transactionReference(字符串)
- 值 -> 时间戳,approved/canceled(JSON 字符串)
我想创建一个缓存来保存所有最近的交易(最后 10 分钟)。
其余客户端可以通过提供事务引用来查询缓存。
问题:
- Kafka 流(连同它的 Materialized 视图)是否适合实现这样的缓存?
- 如果是,你会怎么做?请记住,它只需要保留最后 10 分钟的交易并丢弃较早的交易。
- 如果不是,为什么不呢?
是的,在 kafka-streams
中开发它是个好主意。怎么做?
- 首先,创建代表缓存值的 class:
class Transaction {
Instant createTime;
Status status;
String transactionReference;
}
- 其次,创建处理缓存逻辑的 class - 实现
org.apache.kafka.streams.kstream.Transformer<K, V, R>
:
public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {
private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);
private KeyValueStore<String, Transaction> transactions;
@Override
public void init(ProcessorContext context) {
this.transactions = context.getStateStore("transactions-store");
context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
timestamp -> transactions.all()
.forEachRemaining(kV -> {
if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
transactions.delete(kV.key);
}
}));
}
private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
}
@Override
public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
Transaction t = this.transactions.get(transaction.getTransactionReference());
if (t == null) {
transactions.put(transaction.getTransactionReference(), transaction);
}
return null;
}
@Override
public void close() {
}
}
- 然后,在拓扑中注册transformer:
static StreamsBuilder buildKafkaStreamsTopology() {
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
builder.addStateStore(transferProcessKeyValueStore);
builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
.transform(TransactionsCache::new, "transactions-store");
return builder;
}
- 下一步是读取 http 控制器中的数据:
@RestController
public class TransactionsController {
private final KafkaStreams kafkaStreams;
public TransactionsController(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));
return store.get(transactionReference);
}
}
- 最后一件事。请记住,内存中的缓存默认情况下是分区的,因此如果应用程序的 运行 许多实例,您需要添加一些 RPC 方法以在未命中(Kafka Interactive Queries)的情况下从另一个实例获取数据,这里您有一些 very neat example。或者第二种解决方案是使用
org.apache.kafka.streams.kstream.GlobalKTable<K, V>
我正在尝试了解使用 Kafka Streams
时的可能性以及如何思考。
用例:
有个话题叫Transactions
:
- key -> transactionReference(字符串)
- 值 -> 时间戳,approved/canceled(JSON 字符串)
我想创建一个缓存来保存所有最近的交易(最后 10 分钟)。
其余客户端可以通过提供事务引用来查询缓存。
问题:
- Kafka 流(连同它的 Materialized 视图)是否适合实现这样的缓存?
- 如果是,你会怎么做?请记住,它只需要保留最后 10 分钟的交易并丢弃较早的交易。
- 如果不是,为什么不呢?
是的,在 kafka-streams
中开发它是个好主意。怎么做?
- 首先,创建代表缓存值的 class:
class Transaction {
Instant createTime;
Status status;
String transactionReference;
}
- 其次,创建处理缓存逻辑的 class - 实现
org.apache.kafka.streams.kstream.Transformer<K, V, R>
:
public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {
private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);
private KeyValueStore<String, Transaction> transactions;
@Override
public void init(ProcessorContext context) {
this.transactions = context.getStateStore("transactions-store");
context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
timestamp -> transactions.all()
.forEachRemaining(kV -> {
if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
transactions.delete(kV.key);
}
}));
}
private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
}
@Override
public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
Transaction t = this.transactions.get(transaction.getTransactionReference());
if (t == null) {
transactions.put(transaction.getTransactionReference(), transaction);
}
return null;
}
@Override
public void close() {
}
}
- 然后,在拓扑中注册transformer:
static StreamsBuilder buildKafkaStreamsTopology() {
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
builder.addStateStore(transferProcessKeyValueStore);
builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
.transform(TransactionsCache::new, "transactions-store");
return builder;
}
- 下一步是读取 http 控制器中的数据:
@RestController
public class TransactionsController {
private final KafkaStreams kafkaStreams;
public TransactionsController(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));
return store.get(transactionReference);
}
}
- 最后一件事。请记住,内存中的缓存默认情况下是分区的,因此如果应用程序的 运行 许多实例,您需要添加一些 RPC 方法以在未命中(Kafka Interactive Queries)的情况下从另一个实例获取数据,这里您有一些 very neat example。或者第二种解决方案是使用
org.apache.kafka.streams.kstream.GlobalKTable<K, V>