使用 Kafka Streams 构建缓存

Building a cache with Kafka Streams

我正在尝试了解使用 Kafka Streams 时的可能性以及如何思考。

用例:

有个话题叫Transactions:

我想创建一个缓存来保存所有最近的交易(最后 10 分钟)。

其余客户端可以通过提供事务引用来查询缓存。

问题:

  1. Kafka 流(连同它的 Materialized 视图)是否适合实现这样的缓存?
  2. 如果是,你会怎么做?请记住,它只需要保留最后 10 分钟的交易并丢弃较早的交易。
  3. 如果不是,为什么不呢?

是的,在 kafka-streams 中开发它是个好主意。怎么做?

  1. 首先,创建代表缓存值的 class:
class Transaction {
 Instant createTime;
 Status status;
 String transactionReference;
}
  1. 其次,创建处理缓存逻辑的 class - 实现 org.apache.kafka.streams.kstream.Transformer<K, V, R>:
public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {

    private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);

    private KeyValueStore<String, Transaction> transactions;

    @Override
    public void init(ProcessorContext context) {
        this.transactions = context.getStateStore("transactions-store");
        context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
            timestamp -> transactions.all()
                .forEachRemaining(kV -> {
                    if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
                        transactions.delete(kV.key);
                    }
                }));
    }

    private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
        return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
    }

    @Override
    public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
        Transaction t = this.transactions.get(transaction.getTransactionReference());
        if (t == null) {
            transactions.put(transaction.getTransactionReference(), transaction);
        }
        return null;
    }

    @Override
    public void close() {

    }
}
  1. 然后,在拓扑中注册transformer:
    static StreamsBuilder buildKafkaStreamsTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
            .keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
        builder.addStateStore(transferProcessKeyValueStore);

        builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
            .transform(TransactionsCache::new, "transactions-store");

        return builder;
    }
  1. 下一步是读取 http 控制器中的数据:
@RestController
public class TransactionsController {

    private final KafkaStreams kafkaStreams;

    public TransactionsController(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
    }

    @GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
    Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
        ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
            StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));

        return store.get(transactionReference);
    }
}

  1. 最后一件事。请记住,内存中的缓存默认情况下是分区的,因此如果应用程序的 运行 许多实例,您需要添加一些 RPC 方法以在未命中(Kafka Interactive Queries)的情况下从另一个实例获取数据,这里您有一些 very neat example。或者第二种解决方案是使用 org.apache.kafka.streams.kstream.GlobalKTable<K, V>