Kafka Streams——在同一主题上获取 KTable 和 KStream 的最佳方式?
Kafka Streams – best way to get KTable and KStream on same topic?
我对 Kafka Streams (0.10.1.1) 有疑问。我正在尝试创建关于同一主题的 KStream
和 KTable
。
我尝试的第一种方法是简单地调用流的 KStreamBuilder
方法和同一主题的 table。这导致
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.
好的,这似乎是 Kafka Streams 内置的一些限制。
我的第二种方法最初是创建一个 KTable
并在其上使用 toStream()
方法。这有 KTables
做一些内部 buffering/flushing 的问题,所以如果一个键像我的例子一样多次出现,输出流不会反映所有输入元素。这是一个问题,因为我正在计算键的出现次数。
似乎可行的方法是首先创建一个 KStream
,按键对其进行分组,然后 "reduce" 通过丢弃旧聚合并仅保留新值来对它进行分组。我对这种方法不太满意,因为 a) 它看起来很复杂,b) Reducer
接口没有指定哪个是已经聚合的值,哪个是新的。我按照惯例保留了第二个,但是......嗯。
所以问题归结为:有没有更好的方法?我是不是遗漏了一些非常明显的东西?
请记住,我不是在研究正确的用例 – 这只是我了解 Streams-API.
关于两次添加主题:这是不可能的,因为 Kafka Streams 应用程序是单个 "consumer group" 因此只能一次提交主题的偏移量,而两次添加主题将表明topic get的consumer两次(并且独立进步)。
对于方法 KTable#toStream()
,您可以通过 StreamsConfig
参数 cache.max.bytes.buffering == 0
禁用缓存。但是,这是一个全局设置,对所有 KTable
禁用 caching/deduplication(参见 http://docs.confluent.io/current/streams/developer-guide.html#memory-management)。
Update: Since Kafka 0.11 it's possible to disable caching for each KTable individually via Materialized
parameter.
groupBy
方法也有效,即使它需要一些样板文件。我们考虑将 KStream#toTable()
添加到 API 以简化此转换。是的,reduce
中的第二个参数是新值——因为 reduce 用于组合两个值,API 没有 "old" 和 "new" 的概念,因此参数没有这样的命名。
我对 Kafka Streams (0.10.1.1) 有疑问。我正在尝试创建关于同一主题的 KStream
和 KTable
。
我尝试的第一种方法是简单地调用流的 KStreamBuilder
方法和同一主题的 table。这导致
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.
好的,这似乎是 Kafka Streams 内置的一些限制。
我的第二种方法最初是创建一个 KTable
并在其上使用 toStream()
方法。这有 KTables
做一些内部 buffering/flushing 的问题,所以如果一个键像我的例子一样多次出现,输出流不会反映所有输入元素。这是一个问题,因为我正在计算键的出现次数。
似乎可行的方法是首先创建一个 KStream
,按键对其进行分组,然后 "reduce" 通过丢弃旧聚合并仅保留新值来对它进行分组。我对这种方法不太满意,因为 a) 它看起来很复杂,b) Reducer
接口没有指定哪个是已经聚合的值,哪个是新的。我按照惯例保留了第二个,但是......嗯。
所以问题归结为:有没有更好的方法?我是不是遗漏了一些非常明显的东西?
请记住,我不是在研究正确的用例 – 这只是我了解 Streams-API.
关于两次添加主题:这是不可能的,因为 Kafka Streams 应用程序是单个 "consumer group" 因此只能一次提交主题的偏移量,而两次添加主题将表明topic get的consumer两次(并且独立进步)。
对于方法 KTable#toStream()
,您可以通过 StreamsConfig
参数 cache.max.bytes.buffering == 0
禁用缓存。但是,这是一个全局设置,对所有 KTable
禁用 caching/deduplication(参见 http://docs.confluent.io/current/streams/developer-guide.html#memory-management)。
Update: Since Kafka 0.11 it's possible to disable caching for each KTable individually via
Materialized
parameter.
groupBy
方法也有效,即使它需要一些样板文件。我们考虑将 KStream#toTable()
添加到 API 以简化此转换。是的,reduce
中的第二个参数是新值——因为 reduce 用于组合两个值,API 没有 "old" 和 "new" 的概念,因此参数没有这样的命名。