Java 在源代码中设置 Kafka 保留时间
Java set Kafka retention time in source code
我有以下问题。我需要为某些选定的主题在 Kafka 中设置保留时间。我找到了一个解决方案,可以使用以下命令进行设置:
kafka-topics --zookeeper localhost:2181 --alter --topic topic-name --config retention.ms=-1
我检查了 Kafka 的 Web UI 并确认它已更改。
如果可以的话,我想自己在Java中设置保留时间,但是我好像找不到合适的class/configuration来设置时间。我以为我可以在 ProducerConfig class 中获取有关保留的信息,但我在那里找不到它。
是否可以在 Java 中设置保留时间,如果可能,我该如何完成?
提前致谢!
我之前从 Java 了解到您可以作为客户端登录。无法更改主题配置。
最近他们在新版本的 Kafka 中引入了这个,我认为这样做是可能的。
我现在的答案听起来有点模糊,但如果你想要完整的答案,你必须更具体。如果您拥有准确的 Kafka 版本、用于连接到 kafka 的库等,我可以扩展答案...
我认为从您的应用程序更改服务器端配置不是最好的主意。这是更基础的事情,不应该在 运行 应用程序 运行 时完成。
这对我有用:)
private void setRetentionTime(String topicName, int retentionTime) {
ConfigResource resource = new ConfigResource(Type.TOPIC, topicName);
Collection<ConfigEntry> entries = new ArrayList<>();
entries.add(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionTime)));
Config config = new Config(entries);
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(resource, config);
AdminClient client = kafkaConfig.createAdminClient();
client.alterConfigs(configs);
}
我有以下问题。我需要为某些选定的主题在 Kafka 中设置保留时间。我找到了一个解决方案,可以使用以下命令进行设置:
kafka-topics --zookeeper localhost:2181 --alter --topic topic-name --config retention.ms=-1
我检查了 Kafka 的 Web UI 并确认它已更改。
如果可以的话,我想自己在Java中设置保留时间,但是我好像找不到合适的class/configuration来设置时间。我以为我可以在 ProducerConfig class 中获取有关保留的信息,但我在那里找不到它。
是否可以在 Java 中设置保留时间,如果可能,我该如何完成?
提前致谢!
我之前从 Java 了解到您可以作为客户端登录。无法更改主题配置。
最近他们在新版本的 Kafka 中引入了这个,我认为这样做是可能的。
我现在的答案听起来有点模糊,但如果你想要完整的答案,你必须更具体。如果您拥有准确的 Kafka 版本、用于连接到 kafka 的库等,我可以扩展答案...
我认为从您的应用程序更改服务器端配置不是最好的主意。这是更基础的事情,不应该在 运行 应用程序 运行 时完成。
这对我有用:)
private void setRetentionTime(String topicName, int retentionTime) {
ConfigResource resource = new ConfigResource(Type.TOPIC, topicName);
Collection<ConfigEntry> entries = new ArrayList<>();
entries.add(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionTime)));
Config config = new Config(entries);
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(resource, config);
AdminClient client = kafkaConfig.createAdminClient();
client.alterConfigs(configs);
}