Kafka Streaming 任务和内部状态存储的管理
Kafka Streaming tasks and management of Internal state stores
假设我们在 2 台不同的机器(实例)上启动了 2 个 Streaming-Tasks,具有以下属性:-
public final static String applicationID = "StreamsPOC";
public final static String bootstrapServers = "10.21.22.56:9093";
public final static String topicname = "TestTransaction";
public final static String shipmentTopicName = "TestShipment";
public final static String RECORD_COUNT_STORE_NAME = "ProcessorONEStore";
并使用上述这些属性,stream-task 的定义如下所示:-
Map<String, String> changelogConfig = new HashMap();
changelogConfig.put("min.insyc.replicas", "1");
// Below line not working.
changelogConfig.put("topic", "myChangedTopicLog");
StoreBuilder kvStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(AppConfigs.RECORD_COUNT_STORE_NAME),
AppSerdes.String(), AppSerdes.Integer()
).withLoggingEnabled(changelogConfig);
kStreamBuilder.addStateStore(kvStoreBuilder);
KStream<String, String> sourceKafkaStream = kStreamBuilder.stream
(AppConfigs.topicname, Consumed.with(AppSerdes.String(), AppSerdes.String()));
现在,正如我观察到的那样,kafka 在幕后创建了主题(为了备份内部状态存储),名称如下:- StreamsPOC-ProcessorONEStore-changelog
第一个问题是:- 两个不同的流任务是否维护和备份内部状态存储到同一主题?
第二个问题是 ;- 假设 Task-1 在 partition-1 上拾取并将 say 写入其本地内部状态存储并且 Task-2 开始在 Partition-2 上工作并且假设它还将 写入其本地各自的状态存储,那么它不会引发数据被覆盖的风险,因为这两个任务都将数据备份到相同的变更日志主题?
第三个问题是:- 如何将自定义名称指定为 Change-log-topic?
不胜感激!!
首先,对术语进行一些思考:术语“任务”在 Kafka Stream 中具有 well-define 的含义,作为用户,您不会自己创建任务。当您的程序被执行时,Kafka Streams 会创建 任务 ,它们是“独立的计算单元”并为您执行这些任务。 -- 我想,您所说的“任务”实际上是一个 KafkaStreams
客户端(称为 实例 )。
如果您使用相同的 application.id
启动多个 实例 ,它们将形成一个消费者组,它们将以 data-parallel 的方式分担负载。对于状态存储,每个实例将托管存储的 shard(有时也称为分区)。所有实例都使用相同的主题,并且该主题对每个存储分片都有一个分区。从存储分片到变更日志分区有一个 1:1 映射。此外,还有一个从输入主题分区到 任务 的 1:1 映射,以及任务和存储分片之间的 1:1 映射。因此,总体而言,它是一个 1:1:1:1 映射:对于每个输入主题分区,都会创建一个任务,每个任务都保存状态存储的一个分片,每个存储分片都由更新日志主题的一个分区支持. (即,底线是,输入主题分区的数量决定了您获得的并行任务和存储分片的数量,并且更改日志主题创建的分区数量与您的输入主题相同。)
- 所以是的,所有实例都使用相同的变更日志主题。
- 由于任务是通过分片和更新日志主题分区隔离的,因此它们不会相互覆盖。但是,任务的思想是每个任务处理不同的(non-overlapping)key-space,因此所有具有相同
<k1,...>
的记录应该由相同的任务处理。当然,这个规则可能会有例外,如果你的应用程序不使用 non-overlapping key-spaces 程序将被执行(当然,这取决于你的业务逻辑要求,这可能是正确的或不正确)。
- 您似乎已经这样做了:请注意,您只能自定义变更日志主题名称的一部分:
<application.id>-<storeName>-changelog
-- 即,您可以选择 application.id
和 storeName
。不过,总体命名模式是 hard-coded。
假设我们在 2 台不同的机器(实例)上启动了 2 个 Streaming-Tasks,具有以下属性:-
public final static String applicationID = "StreamsPOC";
public final static String bootstrapServers = "10.21.22.56:9093";
public final static String topicname = "TestTransaction";
public final static String shipmentTopicName = "TestShipment";
public final static String RECORD_COUNT_STORE_NAME = "ProcessorONEStore";
并使用上述这些属性,stream-task 的定义如下所示:-
Map<String, String> changelogConfig = new HashMap();
changelogConfig.put("min.insyc.replicas", "1");
// Below line not working.
changelogConfig.put("topic", "myChangedTopicLog");
StoreBuilder kvStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(AppConfigs.RECORD_COUNT_STORE_NAME),
AppSerdes.String(), AppSerdes.Integer()
).withLoggingEnabled(changelogConfig);
kStreamBuilder.addStateStore(kvStoreBuilder);
KStream<String, String> sourceKafkaStream = kStreamBuilder.stream
(AppConfigs.topicname, Consumed.with(AppSerdes.String(), AppSerdes.String()));
现在,正如我观察到的那样,kafka 在幕后创建了主题(为了备份内部状态存储),名称如下:- StreamsPOC-ProcessorONEStore-changelog
第一个问题是:- 两个不同的流任务是否维护和备份内部状态存储到同一主题?
第二个问题是 ;- 假设 Task-1 在 partition-1 上拾取并将 say
第三个问题是:- 如何将自定义名称指定为 Change-log-topic?
不胜感激!!
首先,对术语进行一些思考:术语“任务”在 Kafka Stream 中具有 well-define 的含义,作为用户,您不会自己创建任务。当您的程序被执行时,Kafka Streams 会创建 任务 ,它们是“独立的计算单元”并为您执行这些任务。 -- 我想,您所说的“任务”实际上是一个 KafkaStreams
客户端(称为 实例 )。
如果您使用相同的 application.id
启动多个 实例 ,它们将形成一个消费者组,它们将以 data-parallel 的方式分担负载。对于状态存储,每个实例将托管存储的 shard(有时也称为分区)。所有实例都使用相同的主题,并且该主题对每个存储分片都有一个分区。从存储分片到变更日志分区有一个 1:1 映射。此外,还有一个从输入主题分区到 任务 的 1:1 映射,以及任务和存储分片之间的 1:1 映射。因此,总体而言,它是一个 1:1:1:1 映射:对于每个输入主题分区,都会创建一个任务,每个任务都保存状态存储的一个分片,每个存储分片都由更新日志主题的一个分区支持. (即,底线是,输入主题分区的数量决定了您获得的并行任务和存储分片的数量,并且更改日志主题创建的分区数量与您的输入主题相同。)
- 所以是的,所有实例都使用相同的变更日志主题。
- 由于任务是通过分片和更新日志主题分区隔离的,因此它们不会相互覆盖。但是,任务的思想是每个任务处理不同的(non-overlapping)key-space,因此所有具有相同
<k1,...>
的记录应该由相同的任务处理。当然,这个规则可能会有例外,如果你的应用程序不使用 non-overlapping key-spaces 程序将被执行(当然,这取决于你的业务逻辑要求,这可能是正确的或不正确)。 - 您似乎已经这样做了:请注意,您只能自定义变更日志主题名称的一部分:
<application.id>-<storeName>-changelog
-- 即,您可以选择application.id
和storeName
。不过,总体命名模式是 hard-coded。