直接写入kafka state store

Writing directly to a kafka state store

我们已经开始试验 Kafka,看看它是否可以用来聚合我们的应用程序数据。我认为我们的用例与 Kafka 流相匹配,但我们不确定我们是否正确使用了该工具。我们构建的概念证明似乎按设计工作,我不确定我们是否正确使用了 API。

我们的概念验证是使用 kafka 流在输出主题中保持 运行有关程序信息的统计,例如

{ 
  "numberActive": 0, 
  "numberInactive": 0, 
  "lastLogin": "01-01-1970T00:00:00Z"  
}

计算计数很容易,它本质上是根据输入主题和输出字段执行比较和交换 (CAS) 操作。

本地状态包含给定密钥的最新程序。我们使用 TransformSupplier 加入针对状态存储的输入流和 运行 CAS 操作,它使用

明确地将数据写入状态存储
context.put(...)
context.commit();

这是对本地状态存储的适当使用吗?是否有另一种方法可以在主题中保持有状态 运行ning 计数?

你的设计对我来说是正确的(我假设你使用的是 PAPI 而不是 Streams DSL),你正在读取一个流,在状态存储与运算符关联的流上调用 transform() 。由于您的更新逻辑似乎只有 key-dependent,因此可以通过基于键分区的 Streams 库令人尴尬地并行化。

需要注意的一点是,您似乎在每次 put 调用后调用 "context.commit()",这不是推荐的模式。这是因为 commit() 操作是一个非常繁重的调用,涉及刷新状态存储、向 Kafka 代理发送提交偏移请求等,在每次调用时调用它会导致非常低的吞吐量。建议仅在处理完一堆记录后才调用 commit(),或者您可以仅依赖 Streams 配置 "commit.interval.ms" 依赖 Streams 库仅在每个时间间隔后在内部调用 commit()。请注意,这不会影响正常关闭时的处理语义,因为关闭时 Streams 将始终执行 commit() 调用。