直接写入kafka state store
Writing directly to a kafka state store
我们已经开始试验 Kafka,看看它是否可以用来聚合我们的应用程序数据。我认为我们的用例与 Kafka 流相匹配,但我们不确定我们是否正确使用了该工具。我们构建的概念证明似乎按设计工作,我不确定我们是否正确使用了 API。
我们的概念验证是使用 kafka 流在输出主题中保持 运行有关程序信息的统计,例如
{
"numberActive": 0,
"numberInactive": 0,
"lastLogin": "01-01-1970T00:00:00Z"
}
计算计数很容易,它本质上是根据输入主题和输出字段执行比较和交换 (CAS) 操作。
本地状态包含给定密钥的最新程序。我们使用 TransformSupplier 加入针对状态存储的输入流和 运行 CAS 操作,它使用
明确地将数据写入状态存储
context.put(...)
context.commit();
这是对本地状态存储的适当使用吗?是否有另一种方法可以在主题中保持有状态 运行ning 计数?
你的设计对我来说是正确的(我假设你使用的是 PAPI 而不是 Streams DSL),你正在读取一个流,在状态存储与运算符关联的流上调用 transform() 。由于您的更新逻辑似乎只有 key-dependent,因此可以通过基于键分区的 Streams 库令人尴尬地并行化。
需要注意的一点是,您似乎在每次 put 调用后调用 "context.commit()",这不是推荐的模式。这是因为 commit()
操作是一个非常繁重的调用,涉及刷新状态存储、向 Kafka 代理发送提交偏移请求等,在每次调用时调用它会导致非常低的吞吐量。建议仅在处理完一堆记录后才调用 commit(),或者您可以仅依赖 Streams 配置 "commit.interval.ms" 依赖 Streams 库仅在每个时间间隔后在内部调用 commit()。请注意,这不会影响正常关闭时的处理语义,因为关闭时 Streams 将始终执行 commit() 调用。
我们已经开始试验 Kafka,看看它是否可以用来聚合我们的应用程序数据。我认为我们的用例与 Kafka 流相匹配,但我们不确定我们是否正确使用了该工具。我们构建的概念证明似乎按设计工作,我不确定我们是否正确使用了 API。
我们的概念验证是使用 kafka 流在输出主题中保持 运行有关程序信息的统计,例如
{
"numberActive": 0,
"numberInactive": 0,
"lastLogin": "01-01-1970T00:00:00Z"
}
计算计数很容易,它本质上是根据输入主题和输出字段执行比较和交换 (CAS) 操作。
本地状态包含给定密钥的最新程序。我们使用 TransformSupplier 加入针对状态存储的输入流和 运行 CAS 操作,它使用
明确地将数据写入状态存储context.put(...)
context.commit();
这是对本地状态存储的适当使用吗?是否有另一种方法可以在主题中保持有状态 运行ning 计数?
你的设计对我来说是正确的(我假设你使用的是 PAPI 而不是 Streams DSL),你正在读取一个流,在状态存储与运算符关联的流上调用 transform() 。由于您的更新逻辑似乎只有 key-dependent,因此可以通过基于键分区的 Streams 库令人尴尬地并行化。
需要注意的一点是,您似乎在每次 put 调用后调用 "context.commit()",这不是推荐的模式。这是因为 commit()
操作是一个非常繁重的调用,涉及刷新状态存储、向 Kafka 代理发送提交偏移请求等,在每次调用时调用它会导致非常低的吞吐量。建议仅在处理完一堆记录后才调用 commit(),或者您可以仅依赖 Streams 配置 "commit.interval.ms" 依赖 Streams 库仅在每个时间间隔后在内部调用 commit()。请注意,这不会影响正常关闭时的处理语义,因为关闭时 Streams 将始终执行 commit() 调用。